module MinDistributed where import IO import Concurrent import qualified Exception (catchAllIO) import DIOMonad import DHSCore hiding (Pid (..) ) import Dictionary data Pid = Pid PidInt data DIOState msg = DIOST { dict :: Dictionary, selfPid :: PidInt, mailbox :: [msg], haltVar :: MVar (), inCh :: StringChan} type DIO msg a = Dist (DIOState msg) a class (Read a,Show a) => Serialize a where serialize :: a -> String serialize = show deserialize :: String -> [(a,String)] deserialize = reads start :: DIO msg () -> IO () start f = do putStr " Dictionary" dict <- Dictionary.startup haltMVar <- newEmptyMVar MinDistributed.runDist (DIOST { dict = dict, haltVar = haltMVar, mailbox = undefined, inCh = undefined, selfPid = undefined }) f takeMVar haltMVar runDist :: DIOState msg -> (Dist (DIOState msg') ()) -> IO Pid runDist self f = do let dic = self#dict ch <- newChan n <- (dic#allocate) ch let me = Pid n forkIO (do catch (DIOMonad.runDist (self { selfPid = n, mailbox = [], inCh = ch }) f) (\e -> do putStrLn $ "Error: " ++ (show e) ) -- Ab hier ist "f" zu Ende -- Beim Dictionary abmelden: (dic#delete) n ) return me halt :: DIO msg () halt = do halt <- gets haltVar proc $ putMVar halt () () :: (Serialize msg) => Pid -> msg -> DIO msg msg () pid@(Pid p) v = do st <- get _ <- proc $ ((dict st)#sendToLocal) p (serialize v) return v self :: DIO a Pid self = do pid <- gets selfPid -- interner Zähler return (Pid pid) spawn :: DIO a () -> DIO b Pid spawn f = do st <- get me <- proc $ MinDistributed.runDist st f return me receive :: (Serialize msg) => (msg -> (DIO msg b)) -> DIO msg b receive f = do st <- get (que',r') <- proc $ receive' (mailbox st) f (expr,mbox) <- case r' of Just e -> return (e, que') Nothing -> do -- read from channel (que'',r'') <- receiveChan (inCh st) f que' return (r'',que'') put st {mailbox = mbox} -- put back modified mbox expr -- excute RHS where receiveChan :: (Serialize a) => StringChan -> (a -> b) -> [a] -> DIO a ([a],b) receiveChan ch f que = readAndParse ch f que receiveChan id readAndParse :: (Serialize a) => StringChan -> (a -> b) -> [a] -> (StringChan -> (a -> b) -> [a] -> DIO a ([a],c)) -> (b -> c) -> DIO a ([a],c) readAndParse ch f que rcvFun m = do str <- proc $ readChan ch let u = deserialize str case u of [] -> do proc $ IO.hPutStrLn stderr $ "dhs: Received message has wrong type:\n" ++ str var <- proc $ myGetVar "DHS_NOSTRICTPARSE" case var of Nothing -> do halt; proc yield; error "halted"; rcvFun ch f que -- There´s no other way out of here?! Just _ -> rcvFun ch f que [(v,_)] -> do x <- proc $ trustMeElseFail f v case x of Nothing -> rcvFun ch f (que++[v]) Just r -> return (que,m r) receive' :: (Serialize msg) => [msg] -> (msg -> b) -> IO ([msg],Maybe b) receive' [] _ = return ([],Nothing) receive' (v:vs) f = do x <- trustMeElseFail f v case x of Just r -> return (vs,Just r) Nothing -> do (vs',s) <- receive' vs f return (v:vs',s) trustMeElseFail :: (Serialize a) => (a -> b) -> a -> IO (Maybe b) trustMeElseFail f v = do Exception.catchAllIO (let r = f v in r `seq` (return (Just r))) (\_ -> (return Nothing))