> module Tunnel (Tunnel, bang, rsend, refresh, findOrAllocTunnel, addConnection, stop, startup, > TunHandle, h,sem,writeH,readH,closeH, tunOpF, thisMV, Eval) > where The Tunnel-module is necessary because during development of the PerfTest-suite it turned out that Distributed Haskell suffers from the well-known TIME_WAIT problem when generating lots of short TCP-connections. Cf. "TCP/IP Illustrated Vol. I", R. Stevens. WARNING!!! Function names are chosen badly, just compare alloc & delete... We could use MVar String instead of semaphores. > import DHSCore > import ObjIO #ifndef __HUGS__ > import NetTCP as Network (Net,connect) > import qualified NetTCP as Network > import Data.Map (Map,empty) > import qualified Data.Map as Map #else > import NetFIFO as Network (Net) > import qualified NetFIFO as Network > import FiniteHugs as FiniteMap #endif > import IO > -- import Control.Exception -- (IOError (..), IOErrorType (..) ) > import Control.Exception (Exception (..),throw ) > import qualified Control.Exception as Exception > import List > import Maybe > import Monad > import Network.BSD #ifdef CONCDEBUG > import ConcurrentDebug as Concurrent #else > import ConcurrentDebugLess as Concurrent #endif > type Eval a = (a -> String -> Network.Hostname -> Port -> IO Bool) > data MuxMsg = Data String | Exception Exception.Exception | Close The MVar is used to grant exclusive access to the handle. However, as MVars usually don´t guarantee that processes are woken up in the same order they suspended. OTOH, I´m not quite sure if this is an issue here. > data TunHandle = HandleR { > mvar :: MVar TunDict, > node :: PNode, > sem :: MVar (Maybe ThreadId), > h :: Handle > } > data Tunnel = TunnelR { > gcMVar :: MVar TunDict, > net :: Network.Net, > myPort :: Port > } > class TunnelC a where > bang :: a -> Node -> String -> IO () > rsend :: a -> Hostname -> String -> IO () > refresh :: a -> PNode -> IO () > addConnection :: a -> Node -> Handle -> IO TunHandle > findOrAllocTunnel :: a -> Node -> IO (Maybe TunHandle) > thisMV :: a -> MVar TunDict > stop :: a -> IO () > -- tunOp :: a -> IOObj b => (PNode -> IO ()) -> Network.Hostname -> Port -> b -> Eval b -> String -> IO Bool, > instance TunnelC Tunnel where > refresh tun = refreshF (gcMVar tun) > bang tun = () (net tun) (myPort tun) (gcMVar tun) True > rsend tun = remoteSend (net tun) (myPort tun) (gcMVar tun) > addConnection tun = addConnectionExt (gcMVar tun) > findOrAllocTunnel tun (Hostname host,port) = > findOrAllocTunnelF (net tun) (gcMVar tun) (myPort tun) (host,port) > thisMV tun = gcMVar tun > stop tun = do > let mv = gcMVar tun > dic <- takeMVar mv > mapM_ (\ (_,(ha,_))-> (internalClose ha True (return ()))) (Map.toList dic) > putMVar mv Map.empty > data GCFlag = Used | Unused | Active deriving Eq > type FMEntry = (TunHandle, GCFlag) > type TunDict = Map PNode FMEntry > tunOpenChar = '=' -- Dangerous. Not exported anyway... > tunCloseChar = '@' > handleToTunF :: MVar TunDict -> Handle -> PNode -> IO TunHandle > handleToTunF mv h n = do > sem <- newMVar Nothing > labelMVar sem (show n) > let handle = HandleR { > mvar = mv, > node = n, > sem = sem, > h = h > } > return handle refreshF needs to be external for now. > refreshF mv hp = do > dic <- takeMVar mv > let newDic = case (Map.lookup hp dic) of > Just (fh, Unused) -> Map.insert hp (fh, Used) dic > _ -> dic > putMVar mv newDic Wrapper-functions. I´m never going to touch this stuff again. > delConnectionLeft mv node = do > dic <- takeMVar mv > let newDic = delConnectionRight dic node > putMVar mv newDic > delConnectionRight dic node = > Map.delete node dic > addConnectionLeft mv node handle = do > dic <- takeMVar mv > let newDic = addConnectionRight dic node handle > putMVar mv newDic > addConnectionRight dic node h = > Map.insert node (h,Used) dic > addConnectionExt mv (Hostname rhost,rport) h = do > let node = (rhost,rport) > handle <- handleToTunF mv h node > addConnectionLeft mv node handle > return handle We transmit our port# so the remote end can see which (DHS-) node we´re from. > findOrAllocTunnelF :: Net -> MVar TunDict -> Port -> PNode -> IO (Maybe TunHandle) > findOrAllocTunnelF net mv me (rname,rport) = do > let hp = (rname,rport) > dic <- takeMVar mv > (res, newDic) <- case (Map.lookup hp dic) of > Nothing -> > return (Nothing, dic) > Just (fh, Used) -> > return (Just fh, dic) -- shave off a few cycles > Just (fh, Unused) -> > return (Just fh, Map.insert hp (fh, Used) dic) > (res',newDic') <- case res of > Just fh -> return (res,newDic) > Nothing -> do > -- print "nada!" > catch (do > h <- (net#connect) rname rport > (h#writeH) (tunOpenChar:(show me)) > handle <- handleToTunF mv h (rname,rport) > return (Just handle,addConnectionRight newDic hp handle) > ) > (\_ -> return (Nothing, newDic)) > putMVar mv newDic' > return res' > instance IOObj TunHandle where > writeH th msg = do > let s = th#sem > let ha = th#h > let mv = th#mvar > let n = th#node > t <- takeMVar s > catch (do > (ha#writeH) msg > refreshF mv n > putMVar s t > ) > (\ e -> do Remove connection from database. Everyone still using the handle is free to shoot himself into the foot, but we don´t want any more processes using this entry: > -- delConnectionLeft mv node > putMVar s t > internalClose th False (delConnectionLeft mv n) > ioError e > ) > readH th = do > let s = th#sem > let ha = th#h > let mv = th#mvar > let n = th#node > res <- takeMVar s > when (isJust res) (print "Tunnel: sanity check failed: isJust!") > res <- hIsOpen ha > unless res (print "Tunnel: sanity check failed: hIsOpen!") > temp <- newEmptyMVar > labelMVar mv "tempmv" > pid <- forkIOLabel "tempreader" (reader temp ha) > putMVar s (Just pid) > msg <- takeMVar temp > res <- case msg of > Data m -> do > res <- takeMVar s Check whether it's still our's and update global dict. > when (isJust res) (refreshF mv n) > putMVar s Nothing > return m > Close -> do > print "Kill!" > return (tunCloseChar:"\n") Everything else is done by the thread calling closeH. > Exception e -> do > print "Exception!" > _ <- takeMVar s Mark connection as broken. Please do not press this button again: > delConnectionLeft mv n > putMVar s Nothing > throw e > return res > where > reader mv h = do > -- catchAllIO (do > Exception.catch (do > res <- h#readH > putMVar mv (Data res) > ) ( \e -> case e of > Exception.IOException _ -> putMVar mv (Exception e) > _ -> putMVar mv Close) > closeH th = do > print $ "closing " ++ (show (th#node)) > internalClose th True (delConnectionLeft (th#mvar) (th#node)) > isOpenH th = do > let s = th#sem > let ha = th#h > t <- takeMVar s > res <- isOpenH ha > putMVar s t > return res internalClose is a subset of the above: > internalClose th eot f = do > let s = th#sem > let ha = th#h > pid <- takeMVar s We must obtain the lock on the filehandle first before we modify the global mvar! Otherwise someone might call refresh in between. > f > case pid of > Just pid -> do > killThread pid > otherwise -> return () > when eot (catch ((ha#writeH) (tunCloseChar:"\n")) (\ _ -> return ())) > ha#closeH > putMVar s Nothing > startup :: Net -> Port -> Int -> IO Tunnel > startup net myPort delay = do > gcMVar <- newMVar Map.empty > labelMVar gcMVar "Tunnel.GC" > forkIOLabel "Tunnel.collector" (collector net gcMVar delay) > let tun = TunnelR { > gcMVar = gcMVar, > myPort = myPort, > net = net > } > return tun This is the garbage-collector thread. It waits ´delay´ seconds, synchronizes and takes one turn. Then it puts back the modified dictionary. > collector :: Net -> MVar TunDict -> Int -> IO () > collector net gcMVar delay = do Hugs doesn´t have threadDelay. Need workaround! #ifndef __HUGS__ > threadDelay (delay * 1000 * 1000) #else > yield #endif > dic <- takeMVar gcMVar > var <- myGetVar "TunDebug" > when (isJust var) (putStrLn "gc") As a shortcut, we put back the MVar as soon as we have sorted out the chaff. Switch the flag on still valid entries! > let (unused,used) = Map.foldWithKey (\ node entry (uu,u) -> > case entry of > (_,Unused) -> ((node,entry):uu,u) > (fh,Used) -> (uu, Map.insert node (fh,Unused) u)) ([],Map.empty) dic > putMVar gcMVar used > mapM_ (\ (_,(ha,_)) -> do > var <- myGetVar "TunDebug" > when (isJust var) (putStrLn "meep") > internalClose ha True (return ()) > ) unused > collector net gcMVar delay The network stuff. We need two things: The normal and ´sendRemote´: This is the function which really does use the network do deliver a message. You can plug in here any other libraries. You might want to change ´remoteSendTo´, too, though it really shouldn't matter. > () :: Net -> Port -> MVar TunDict -> Bool -> Node -> String -> IO () > () net mp mv first > r@(Hostname rname,rport) msg = do > let rnode = (rname,rport) > -- putStrLn ("Trying to deliver " ++ msg ++ " to " ++ (show rnode)) > tun <- findOrAllocTunnelF net mv mp rnode > case tun of > Nothing -> do > putStrLn $ "dhs: WARNING! Unable to establish TCP-tunnel to " ++ (show rnode) ++ "!" > -- XXX nodeFailed > Just ha -> do > catch (do > (ha#writeH) msg > ) > (\ e -> do > print $ "tun.e1: " ++ (show e) writeH already did all the necessary things: > internalClose ha False (return ()) Retry once in case we hit a dead handle: > if first > then () net mp mv False r msg > else print "fail" >> ioError e > ) > remoteSend :: Net -> Port -> MVar TunDict -> Hostname -> String -> IO () > remoteSend net mp mv host msg = do > () net mp mv True (host,distHaskellPortNr) msg Cast Handle To Tunnel-handle, add to Tunnel-dict and continue. > -- tunOpF :: IOObj a => MVar TunDict -> (PNode -> IO ()) -> Network.Hostname -> Port -> a > -- -> Eval a -> String -> IO Bool > tunOpF mv nodeFailed host port handle ev msg = do Remember that we don't have the correct remote port yet! We only have the port of the connection which has got nothing to do with the port required for identifying a node. So we rely on the remote to transmit its port. It's used in the nodeFailed below. > let nPort = ((read msg) :: Port) XXX We can't use the connection in the reverse direction as we don't know if someone's listening on the end. Adding this functionality to findOrAllocTunnelF proved to be more difficult than expected! > tunportListener handle (host,nPort) > where > -- tunportListener :: a -> PNode -> IO Bool > tunportListener handle node@(host,nPort) = do > res <- catch (do > liftM Just $ handle#readH > ) > (\e -> do > unless (isEOFError e) (do > putStrLn ("hGetLine in TCP-Tunnel failed, reason:\n"++(show e)) > nodeFailed node > ) > return Nothing > ) > case res of > Just line -> do We intercep the close-request for the tunnel. > case line of > '=':_ -> do > putStrLn "TCP-Tunnel insanity" > return True > '@':msg' -> do > -- putStrLn "TCP-Tunnel closing." > return True > _ -> do Pass the real port number to eval and loop eventually. > closeTun <- ev handle line host port > if closeTun > then return True > else tunportListener handle node > Nothing -> return True