> module LinkList (startup, LinkList,iQuit,linkMeWith,unlinkMeFrom,pidFailed,nodeFailed) > where > import DHSCore > import ObjIO > import Dictionary hiding (startup) > import IO > import Prelude hiding (lookup) #ifdef CONCDEBUG > import ConcurrentDebug as Concurrent #else > import ConcurrentDebugLess as Concurrent #endif > import Monad > import Maybe #ifndef __HUGS__ > import qualified Data.Map as Map > import System.Posix.Signals (installHandler, sigUSR1, Handler (..) ) #else > import FiniteHugs as FiniteMap #endif The first Pid indicates which process wants to be linked with the process pertaining to the second Pid. Notify informs all processes in the list of the death of a remote process. > data Thrice = Alive | Pending | Cleanup deriving (Read,Show) > data Msgs = AddLink Bool PidInt Pid | RemoveAnyLink PidInt | RemoveLink PidInt Pid | Notify Pid | SNotify PNode | Dump | Ping Pid PidInt | Pong Pid | Stop deriving (Show, Read) We define a sole constructor so we can use read and show on algebraic datatypes instead of treating strings. > data Dummy = Down Pid deriving (Show) > class LinkListC a where > iQuit :: a -> PidInt -> IO () > linkMeWith :: a -> PidInt -> Pid -> IO () > unlinkMeFrom :: a -> PidInt -> Pid -> IO () > pidFailed :: a -> Pid -> IO () > nodeFailed :: a -> PNode -> IO () > data LinkList = LLST { > chan :: Chan Msgs > } > instance LinkListC LinkList where > iQuit ll me = > writeChan (chan ll) (RemoveAnyLink me) > linkMeWith ll me wwith = > writeChan (chan ll) (AddLink False me wwith) > unlinkMeFrom ll me from = > writeChan (chan ll) (RemoveLink me from) > pidFailed ll p = > writeChan (chan ll) (Notify p) > nodeFailed ll n = > writeChan (chan ll) (SNotify n) The first element indicates a node and contains a lists of pids (just the Int) which uniquely define a process and a list of all local process linked to it: node1------------------> node2 -------------->.... | | | -> #1234 -------> #1236 -> ... | | | | -> Pid -> Pid -> Pid -> Pid -> #5555 -> #5556 +> Pid +> Pid > type LLDict = Map.Map (String,Port) (Map.Map PidInt (Thrice,[(Int,PidInt)])) > -- startup :: PNode -> (Dictionary.Dictionary, Node -> String -> IO ()) -> IO (LinkList, > startup this (dict,tun) = do > ch <- newChan > labelChan ch "LinkList.In" > sync <- newMVar Map.empty > labelMVar sync "LinkList.sync" #ifndef __HUGS__ Install handler for sigUSR1 to do a dump of all active links: > _ <- installHandler sigUSR1 (Catch (writeChan ch Dump)) Nothing #endif > forkIOLabel "LinkList.Main" (linkList ch sync) XXX read from Variable > forkIO (pinger 10 ch sync) -- sets it's own labels! > let ll = LLST { > chan = ch > } > return (ll, rcvLinkLevelMsg ch) > where > linkList :: Chan Msgs -> MVar LLDict -> IO () > linkList chans@ci sync = do > msg <- readChan ci > case msg of > AddLink update from to@(Pid node p) -> do We use the flag "update" for signalling the difference between a plain user link or an update due to pinging. > -- putStrLn $ "link from " ++ (show from) ++ " to " ++ (show to) > list <- takeMVar sync Does the target already exist? > let entry = (Alive, [(1,from)]) > list' = case (Map.lookup node list) of > Nothing -> > Map.singleton p entry > Just t -> > let list'' = case (Map.lookup p t) of > Nothing -> entry > Just (flag,ps) -> > (Alive, if update then ps > else replaceAndOp (+1) from ps > ) > in (Map.insert p list'' t) > in putMVar sync (Map.insert node list' list) > linkList chans sync > RemoveAnyLink from -> do > -- putStrLn "RemoveAnyLink" > list <- takeMVar sync > let list' = Map.map (\pids -> Map.map (\(flag,els) -> (flag,filter (\(_,p) -> p /= from) els)) pids) list > putMVar sync list' > linkList chans sync > RemoveLink from to@(Pid node p) -> do > -- putStrLn "RemoveLink" > list <- takeMVar sync > let list' = case (Map.lookup node list) of > Nothing -> list > Just list' -> > case (Map.lookup p list') of > Nothing -> list > Just (flag,list'') -> Map.insert node (Map.insert p (flag, replaceAndOp ((-)1) from list'') list') list > putMVar sync list' > linkList chans sync > SNotify node -> do -- an entire node disappeared > -- putStrLn $ "notify " ++ (show node) > list <- takeMVar sync > list' <- do > let x = Map.lookup node list > case x of > Nothing -> return list > Just portFM -> do > mapM (\(p,(flag,pids)) -> mapM (notify dict (Pid node p)) pids) (Map.toList portFM) > return (Map.delete node list) > putMVar sync list' > linkList chans sync > Notify rpid@(Pid node port) -> do > -- putStrLn $ "notifyPID" ++ (show rpid) > list <- takeMVar sync > list' <- do > let nodeList = Map.lookup node list > case nodeList of > Nothing -> return list > Just portFM -> do > let processes = Map.lookup port portFM > case processes of > Nothing -> return list > Just (_,pids) -> do > mapM_ (notify dict rpid) pids > return (Map.insert node (Map.delete port portFM) list) > putMVar sync list' > linkList chans sync > Ping (Pid rnode _) p -> do -- XXX Could be better > -- print $ "ping! " ++ (show p) > msg <- (dict#lookup) p XXX Remote... This will reallocate a tunnel: > when (isJust msg) (tun (plainToNode rnode) ("!" ++ (show (Pong (Pid this p))))) > linkList chans sync > Pong (Pid node p) -> do > -- print $ "pong! " ++ (show p) > list <- takeMVar sync > let list' = case Map.lookup node list of > Just n -> case Map.lookup p n of > Just (_,entry) -> Map.insert node (Map.insert p (Alive,entry) n) list > Nothing -> list > Nothing -> list > putMVar sync list' > linkList chans sync > Dump -> do Sending the dump to stdout doesnnīt make things easier. If someone starts using it, it should dump to a file, e.g. /tmp/lldump.. or /tmp/lldump. > list <- takeMVar sync > dumpfunc list > putMVar sync list > linkList chans sync > Stop -> return () -- XXX Send remaining notifications > where > dumpfunc list = do > putStrLn "LinkList dump:" > mapFMM_ (\node ps _ -> > mapFMM_ (\p pids _ -> do > putStrLn (show (Pid node p)) > putStrLn (show pids)) ps) > list > replaceAndOp _ _ [] = [] > replaceAndOp op from (x@(count,p):xs) > | p == from = ((op count,p):xs) > | otherwise = x:(replaceAndOp op from xs) > notify :: Dictionary.Dictionary -> Pid -> (Int,PidInt) -> IO () > notify dict rpid (count,p) = do > putStrLn $ "Notifying " ++ (show p) > putStrLn $ "LinkList: Down-event from " ++ (show rpid) > w <- sequence (replicate count ((dict#sendToLocal) p (show (Down rpid)))) > let res = foldr (||) False w > unless res (putStrLn "LinkList: Non-existing process in notification-list?!") > pinger :: Int -> Chan Msgs -> MVar LLDict -> IO () > pinger delay ch sync = do > labelThread "LinkList.Ping: sleeping" > threadDelay (delay * 1000 * 1000) > list <- takeMVar sync > -- print "pinging..." Weed out all that are still False or have count at zero: - could be changed to modify the existing FM instead of creating a new one. > let > (checking,notifying) = Map.foldWithKey (\node pids (x,z) -> > let > (ok,failed) = Map.foldWithKey (\pid (flag,cs) (ok,failed) -> > let cs' = filter (\(count,_) -> count /= 0) cs in > case flag of > Alive -> (Map.insert pid (Pending,cs') ok, failed) > Pending -> (Map.insert pid (Cleanup,cs') ok,(pid,cs'):failed) > Cleanup -> (ok,failed) > ) (Map.empty, []) pids in > (Map.insert node ok x, (node,failed):z) > ) (Map.empty, []) list > -- print (show ((snd list'))) Notify failed pings. IMO it's too fine-grained to use forkIO. > labelThread "LinkList.Ping: notifying" > mapM (\(n,ps) -> > mapM (\(p,cs) -> > mapM (\pid -> notify dict (Pid n p) pid > ) cs > ) ps > ) notifying > putMVar sync checking Ping remaining items: One thread iterates over all entries. - shouldnīt be necessary to forkIO here, we donīt have anything to do anyway. This way we donīt run into trouble with two overlapping passes if the first one takes to long. > mapFMM_ (\node pids _-> XXX Collect all Pids and send *one* message > mapFMM_ (\pid _ _-> do > let msg = Ping (Pid this 0) pid > if node == this Ah, weīre responsible for this one! Send a Ping-request to myself! > then writeChan ch msg On remote hosts, use regular way: > else tun (plainToNode node) ("!" ++ (show msg)) > ) pids > ) checking > pinger delay ch sync This one is passed to the PortListener on Distributed.lhs initialization. > -- rcvLinkLevelMsg :: Chan Msgs -> String -> IO Bool > rcvLinkLevelMsg ch _ _ _ _ msg = do > let u = (reads msg) > case u of > [] -> do > print $ "dhs: LinkLevel message not understood: " ++ msg ++ "\n" > -- XXX halt > [(v,_)] -> writeChan ch v > return False > mapFMM_ f fm = Map.foldWithKey f (return ()) fm