The Distributed Haskell Module for ghc-4.08+. Todo: - calling ´halt´ won´t stop all helper threads... ENVIRONMENT: DHS_DHD DHS_DHDHOST DHS_HOSTNAME DHS_TIMEFIX DHS_NOSTRICTPARSE > module Distributed ( > spawn, spawnLink, > send, (), > receive, #ifndef __HUGS__ > receiveAfter, #endif > self, proc, > start, distHaskellPortNr, Port, Node, > register, unregister, showRegistered, > dhdLookup, remoteSend, remoteLookup, > -- sendToLocal, > Distributed.linkMeWith, Distributed.unlinkMeFrom, > myGetVar, myGetVarDefault, halt, > Hostname (..), Nodename (..), > Servicename, Magic, > Serialize, > Distributed.runDist, get, > Pid (..), DIO) where Basic imports: #ifdef CONCDEBUG > import ConcurrentDebug as Concurrent #else > import ConcurrentDebugLess as Concurrent #endif #ifndef __HUGS__ > import System.Posix > import System.Process (runProcess) > import Time > import NetTCP as Network hiding (startup, Hostname) > import qualified NetTCP as Network > import qualified Data.Map as Map > -- import qualified SocketPrim -- ouch! > import Network.URI > import DHSURI > import DIOMonad #else > import NetFIFO as Network hiding (startup,myGetVar, Hostname) > import qualified NetFIFO as Network > import FiniteHugs as FiniteMap #endif > import IO -- hiding (catch, hClose, hGetLine, hPutStrLn, Handle) > import qualified Control.Exception as Exception > import Network.Socket (inet_ntoa, SockAddr(..), getSocketName) > import qualified Network.Socket as Socket > import Network (PortID(..), connectTo) > -- import qualified Network as PrimNetwork > -- import qualified SocketPrim > import qualified Network.BSD as BSD > import Maybe > import Monad > import Word The Distributed Haskell modules: > import DHSCore > import ObjIO > import PortListener hiding (startup) > import qualified PortListener > import Dictionary hiding (startup) > import qualified Dictionary > import Register hiding (startup) > import qualified Register > import LinkList hiding (startup,linkMeWith,unlinkMeFrom) > import qualified LinkList > import DHSOptions You can provide your own transmission-protocol here: > import Tunnel (bang,rsend,addConnection,Tunnel) > import qualified Tunnel > -- import Timeout Messages in channel for serialization of asynchronous IO tasks. We need to make sure that on p1 A p1 B "B" does not overtake "A" on it's way. The current implementation will serialize everything, though, regardless of whether messages are sent to different processes. > data IOSer = IOSWait (IO ()) | IOSQuit | IOSExit The DIO state & monad. It's hidden from the user, so don't worry. Actually we could put everything except the mailbox and the pid into a global state since these are the same for all processes. > data DIOState msg = DIOST { > dict :: Dictionary, > reg :: Register, > tun :: Tunnel, > selfPid :: PidInt, > node :: PNode, > mailbox :: [msg], > haltVar :: MVar (), > ioSer :: Chan IOSer, > net :: Net, > ll :: LinkList, > inCh :: StringChan} > type DIO msg a = Dist (DIOState msg) a Define our serialization class. To provide reasonable defaults we use "read" and "show", unless the programmer overrides these functions in his code. > class (Read a,Show a) => Serialize a where > serialize :: a -> String > serialize = show > deserialize :: String -> [(a,String)] > deserialize = reads > start :: DIO msg () -> String -> IO () > start f node = do #ifdef CONCDEBUG > putStr "CONCDEBUG\n"; > labelProgram node #endif > putStrLn "DHS: Starting" > putStr " Arguments" > args <- DHSOptions.getArgs We have to set up some signal-handlers to ensure smooth operation. sigINT must unregister with the dhd and kill the portListener so the node exits. > putStr " Signals" > haltMVar <- newEmptyMVar > labelMVar haltMVar "haltMVar" #ifndef __HUGS__ Note that the real signal handlers will be triggered when we finally run the PortListener! > _ <- installHandler sigTERM (Catch (putMVar haltMVar () )) Nothing > _ <- installHandler sigINT (Catch (putMVar haltMVar () )) Nothing -- Ctrl-C. > _ <- installHandler sigHUP (Catch (putMVar haltMVar () )) Nothing #endif > putStr " Dictionary" > dict <- Dictionary.startup > putStr " Register" > register <- Register.startup > preConnect <- myGetVar "DHS_CTEST" > h <- case preConnect of > Nothing -> return Nothing > Just host -> do > h <- connectTo host (PortNumber (fromIntegral 80)) > return (Just h) > putStr " Network" > net <- Network.startup node > s <- (listen net) (iface args) Nothing > let portInt = s # port > putStr $ "@" ++ (show portInt) > putStr " Tunnel" > tun <- Tunnel.startup net portInt 30 Our startup mechanism. We want the node to stay alive even if the last (user-) thread terminated. So we set up a MVar which gets written when the portListener has started. *Then* the user-thread is startet. We could probably let the portListener fork the thread, but I like this one better (it´s cleaner, IMO). Unluckily, we have to restructure this one day as it would look much nicer if I wouldn´t have to call listen *before* the PortListener. Sigh. > -- local <- getHostName -- You probably haven't heard of SOCKS yet, > -- cf. http://www.socks.nec.com/guidelines.html -- #ifdef Hugs missing > sAddr <- getSocketName (s # socket) > local <- case sAddr of > (SockAddrInet port host) -> do > -- XXX I really don´t know that I´m doing here: > let ip = (fromIntegral host) > case ip of > 0 -> do -- INADDR_ANY. We've got to use the hostname's ip > myhostname <- BSD.getHostName > myhostname <- myGetVarDefault "DHS_HOSTNAME" myhostname > hostentry <- BSD.getHostByName myhostname > myhost <- (inet_ntoa (BSD.hostAddress hostentry)) > return myhost > _ -> return (addressToIP host) > _ -> net # getHostName -- XXX output a warning > -- print local > putStr " LinkList" > (ll,rcvLinkLevelMsg) <- LinkList.startup (local,portInt) (dict, tun # bang) -- need only plain > putStrLn "." Try to register the node. If it fails, the entire program terminates > registerNode net (Nodename node) local portInt > syncMVar <- newEmptyMVar > labelMVar syncMVar "syncMVar" > _ <- Distributed.runDist > (DIOST { > dict = dict, > reg = register, > tun = tun, > node = (local,portInt), > haltVar = haltMVar, > net = net, > ll = ll, > ioSer = undefined, > mailbox = undefined, > inCh = undefined, > selfPid = undefined > }) (proc (takeMVar syncMVar) >> f) > PortListener.startup net (dict,register, > [('!',rcvLinkLevelMsg), -- plugin for linking > ('=',Tunnel.tunOpF (tun # Tunnel.thisMV)(ll # nodeFailed)) -- tunnels > --('=',(tun#Tunnel.tunOp) (ll#nodeFailed)) > ]) s syncMVar haltMVar > unregisterNode net (Nodename node) -- Disconnect from dhd > tun # Tunnel.stop > net # Network.stop > register # Register.stop > dict # Dictionary.stop The IO serialization framework. > activateIoSer hv ch = do > msg <- readChan ch > case msg of > IOSQuit -> putMVar hv () -- halt the system by exiting the main thread (portListener) > -- all remaining IO ops are discarded > IOSExit -> return () -- exit this process > (IOSWait m) -> do > m > activateIoSer hv ch serializeIO just puts the IO action into the queue. Looks nicer this way. > serializeIO ch x = writeChan ch (IOSWait x) Overload runDist. Do initilization, clean up after a process exited. This is the place where your ioError in a spawned thread will be caught. > runDist :: DIOState msg -> (Dist (DIOState msg') ()) -> IO Pid > runDist self f = do > let dic = self # dict > let link = self # ll > ch <- newChan > n <- (dic # allocate) ch > labelChan ch (show n) > let me = Pid (self # node) n Funny serialization stub. In each forked IO we read from the channel to activate the next process. To get things going we need this little thread. > ioSerializeCh <- newChan > labelChan ioSerializeCh ((show n) ++ ".ioSer") > forkIOLabel ("IOSer." ++ (show n)) ( activateIoSer (self # haltVar) ioSerializeCh) Copy static values, then fill in new stuff: > forkIOLabel (show n) (do > catch (DIOMonad.runDist (self { > selfPid = n, > mailbox = [], > ioSer = ioSerializeCh, > inCh = ch > }) f) > (\e -> do > putStrLn $ "Error: " ++ (show e) > ) > writeChan ioSerializeCh IOSExit -- terminate I/O-thread > -- remove from process-dictionary: > (dic # delete) n > -- shortcut for local notification: > (link # iQuit) n > (link # pidFailed) me > ) > return me > registerNode :: Net -> Nodename -> String -> Port -> IO () > registerNode _ (Nodename "") _ _ = return () > registerNode net (Nodename node) ip p = do > putStr "Registering myself..." > res <- connectToDhd True > maybe (ioError (userError "dhs: Can't start dhd, set $DHS_DHD to dhd-executable!")) > (\s -> do > (s # writeH) ("%"++ip++"%"++node++"%"++(show p)) > reply <- s # readH > case reply of > "ok" -> do > putStrLn "succeeded!" > _ -> do > error "already registered :(" > s # closeH > ) res > where ´connectToDhd´ does two passes because it might try to start the dhd. We need the boolean ´first´ to distinguish in which pass we are. > connectToDhd :: Bool -> IO (Maybe Handle) > connectToDhd first = do > host <- myGetVarDefault "DHS_DHDHOST" "localhost" > catch ((net # connect) host distHaskellPortNr >>= \x -> return (Just x)) > (\e -> do -- Probably no dhd running, try starting it XXX WARNING! dhd will be killed on exit!? #ifndef __HUGS__ > if first && (host == "localhost") > then do > dhdExecutable <- myGetVarDefault "DHS_DHD" "dhd" > catch (runProcess dhdExecutable [] Nothing Nothing Nothing Nothing Nothing) > (\e -> do > putStrLn $ "*** " ++ (show e) > return undefined > ) > IO.hPutStr stderr "Trying to start dhd, waiting 2 seconds..." > threadDelay (1000*1000*2) > connectToDhd False > else do > return Nothing > ) #else > IO.hPutStrLn stderr "No DHD running!" > return Nothing > ) #endif > unregisterNode :: Net -> Nodename -> IO () > unregisterNode _ (Nodename "") = return () > unregisterNode net (Nodename node) = do > host <- myGetVarDefault "DHS_DHDHOST" "localhost" > catch (do s <- (net # connect) host distHaskellPortNr > (s # writeH) ("/"++node) > s # closeH) > (\e->IO.hPutStrLn stderr "dhs: Can't connect to dhd!") Shuts down a Distributed Haskell node. The PortListener is Concurrent-Haskells main-thread which terminates if the MVar gets set. > halt :: DIO msg () > halt = do > ch <- gets ioSer > proc $ writeChan ch IOSQuit 'self' returns a process its own PID. > self :: DIO msg Pid > self = do > pid <- gets selfPid > node <- gets node > return (Pid node pid) 'spawn' creates a new DH-process and initializes its data-structures. We have to allocate new channels for the replies of the rts. > spawn :: DIO a () -> DIO b Pid > spawn f = do > st <- get > me <- proc $ Distributed.runDist st f > return me SpawnLink. The link is create before starting the process. Well, actually we add the link-call as the first instruction to the body of the process. > spawnLink :: DIO a () -> DIO b Pid > spawnLink f = do > (Pid _ me) <- self > linkList <- gets ll > spawn (do > him <- self > proc $ (linkList # LinkList.linkMeWith) me him > f) The Bang!-function sends a message to a process. We need to distinguish between local and remote hosts. > () :: (Serialize msg) => Pid -> msg -> DIO state msg > () (PidName (host,node,name)) v = > remoteSend host node name v > () pid@(Pid remote@(rname,rport) p) v = do > st <- get > if (st # node) == remote > then do > r <- proc $ ((dict st) # sendToLocal) p (serialize v) > return () > else do > tun <- gets tun > proc $ serializeIO (ioSer st) > (catch (do _ <- (tun # bang) (plainToNode remote) ("+"++(show p)++"+"++(serialize v)) > return ()) > (\_ -> do (nodeFailed (ll st)) remote > return ())) -- Ugly! > -- return () > return v We keep 'send' for compatibility: > send :: Serialize b => Pid -> b -> DIO a b > send = () Registering allows process to send messages to other processes using their unique names rather than their PIDs. XXX > register :: String -> DIO msg Bool > register name = do > register <- gets reg > pid <- gets selfPid > proc $ (register # rNew) name pid > unregister :: String -> DIO msg () > unregister name = do > register <- gets reg > proc $ (register # rDelete) name XXX > showRegistered :: DIO a () > showRegistered = do > register <- gets reg > proc $ register # rDump Connect to remote dhd and lookup a nodename. > dhdLookup :: Net -> Hostname -> Nodename -> IO (Maybe Port) > dhdLookup net (Hostname host) (Nodename node) = do > putStrLn host > h <- (net # connect) host distHaskellPortNr > (h # writeH) $ "$" ++ node > line <- h # readH > h # closeH > return ((read line) :: Maybe Port) Note the delimiters used to split up the message! Because of using '#' the Haskell daemon can just strip the head and send off the remaining data. We could be more aggressive and check if we´re the receiver and use sendToLocal. > remoteSend :: Serialize b => Hostname -> Nodename -> Servicename -> b -> DIO a b > remoteSend host (Nodename node) service v = do > let msg = ("*"++node++"#"++service++"#"++(serialize v)) > ioCh <- gets ioSer > tun <- gets tun > proc $ serializeIO ioCh ((tun # rsend) host msg >> return ()) > return v > -- remoteSendURI :: Show b => URI -> b -> DIO a (Maybe b) > -- remoteSendURI uriString v = do XXX Does not use Tunnel yet. > remoteLookup :: Hostname -> Nodename -> Servicename -> DIO a (Maybe Pid) > remoteLookup h@(Hostname host) n@(Nodename node) service = do > net <- gets net > port <- proc $ dhdLookup net h n > case port of > Nothing -> return Nothing > Just p -> do > proc ( > catch (do > h <- (net # connect) host p > (h # writeH) $ "$" ++ service > line <- h # readH > h # closeH > let res = ((read line) :: Maybe PidInt) > case res of > Nothing -> return Nothing > Just pid -> return (Just (Pid (host,p) pid)) > ) (\ e -> return Nothing) > ) Process-Linking. If we request linking to a remote node, use a LinkLevel-msg to contact the node. In any case, we retain a local copy of the request in our LinkList as we will receive remote msgs via the LinkLevel, too. Request linking with a process. You will receive a message of type "Down Pid" when the remote exits. > linkMeWith :: Pid -> DIO a () > linkMeWith him = do > linkList <- gets ll > (Pid _ me) <- self > proc $ (linkList # LinkList.linkMeWith) me him The corresponding unlink. > unlinkMeFrom :: Pid -> DIO a () > unlinkMeFrom him = do > linkList <- gets ll > (Pid _ me) <- self > proc $ (linkList # LinkList.unlinkMeFrom) me him XXX Foldify? Difficult... > receive :: (Serialize a) => (a -> (DIO a b)) -> DIO a b > receive f = do > st <- get > (que',r') <- proc $ receive' (mailbox st) f > (expr,mbox) <- case r' of > Just e -> return (e, que') > Nothing -> do -- read from channel > (que'',r'') <- receiveChan (inCh st) f que' > return (r'',que'') > put st {mailbox = mbox} -- put back modified mbox > expr -- excute RHS > where > receiveChan :: (Serialize a) => StringChan -> (a -> b) -> [a] -> > DIO a ([a],b) > receiveChan ch f que = readAndParse ch f que receiveChan id We convert 't' to the time by which we'd like to be woken up. Note that a last-minute message will supersede the timeout. Note that currently the Times-module contains a bug handling TimeDiffs. XXX BEWARE of well-known Time-bugs which won´t work outside of GMT. Use DHS_TIMEFIX to adjust the offset in hours. XXX Check Michaels fixes. #ifndef __HUGS__ > receiveAfter :: (Serialize a) => Int -> DIO a b -> (a -> (DIO a b)) -> > DIO a b > receiveAfter t fafter f = do > time <- proc $ getClockTime > env <- proc $ myGetVarDefault "DHS_TIMEFIX" "0" > let dhsTimeFix = (read env) :: Int > let ntime = addToClockTime (TimeDiff { tdYear=0, tdMonth=0, tdDay=0, > tdHour=dhsTimeFix, tdMin=0, tdSec=t, tdPicosec=0}) time > receiveAfter' ntime > > where > receiveAfter' finaltime = do > st <- get > (que',r') <- proc $ receive' (mailbox st) f > case r' of > Just e -> do > put (st {mailbox = que'}) > e Otherwise the mailbox shouldn´t have changed as we haven´t read anything from the channel yet. > Nothing -> do > unless (foldr (&&) True (zipWith ( \ a b -> (show a) == (show b)) que' (mailbox st))) (error "Sanity check failed!!!") > receiveAfter'' finaltime (inCh st) que' > receiveAfter'' finaltime inC msgs = do > st <- get > (que,r) <- receiveChan' inC f msgs > put st {mailbox = que} > case r of > Nothing -> do > curTime <- proc getClockTime > if curTime >= finaltime > then fafter > else do > proc yield > receiveAfter'' finaltime inC que > Just e -> e > > receiveChan' :: (Serialize a) => StringChan -> (a -> b) -> [a] -> > DIO a ([a],Maybe b) > receiveChan' ch f que = do > noMail <- proc $ isEmptyChan ch > if noMail > then return (que,Nothing) > else readAndParse ch f que receiveChan' Just #endif > readAndParse :: (Serialize a) => StringChan -> (a -> b) -> [a] -> > (StringChan -> (a -> b) -> [a] -> DIO a ([a],c)) -> > (b -> c) -> DIO a ([a],c) > readAndParse ch f que rcvFun m = do > str <- proc $ readChan ch > let u = deserialize str > case u of > [] -> do > proc $ IO.hPutStrLn stderr $ "dhs: Received message has wrong type:\n" ++ str > var <- proc $ myGetVar "DHS_NOSTRICTPARSE" > case var of > Nothing -> do XXX We can´t use ioError here as the node will freeze. We have to print a message ourselves and then call ´halt´. Yes, you´re right, it´s awful. One day we will get around to raising our own exception here. > halt; proc yield; error "halted"; rcvFun ch f que -- XXX HELP! > -- There´s no other way out of here?! > Just _ -> rcvFun ch f que > [(v,_)] -> do > x <- proc $ trustMeElseFail f v > case x of > Nothing -> rcvFun ch f (que++[v]) > Just r -> return (que,m r) 'receive's task is to apply the function 'f' to the mailbox-items and return its result. Both the cases that the mailbox is empty and that the function didn't "understand" the message have to be checked. > receive' :: (Serialize msg) => [msg] -> (msg -> b) -> IO ([msg],Maybe b) The empty mailbox: > receive' [] _ = return ([],Nothing) In case that the function used the message remove the message from the queue: > receive' (v:vs) f = do > x <- trustMeElseFail f v > case x of > Just r -> return (vs,Just r) > Nothing -> do > (vs',s) <- receive' vs f > return (v:vs',s) The function throws an exception to indicate that it doesn't understand the message. This saves trouble in DHS-clients as we no longer have to use a preprocessor for adding the tedious "Just $" & "_ -> Nothing" in the receive-blocks. > trustMeElseFail :: (Serialize a) => (a -> b) -> a -> IO (Maybe b) > trustMeElseFail f v = do > Exception.catch (let r = f v in > r `seq` (return (Just r))) > (\_ -> (return Nothing))