Note that there are some explanatory texts on larger screens.

plurals
  1. POOne processing conduit, 2 IO sources of the same type
    text
    copied!<p>In my <code>GHC Haskell</code> application utilizing stm, network-conduit and conduit, I have a strand for each socket which is forked automatically using <code>runTCPServer</code>. Strands can communicate with other strands through the use of a broadcasting TChan.</p> <p>This showcases how I would like to set up the conduit "chain":</p> <p><img src="https://i.stack.imgur.com/E9zNx.png" alt="enter image description here"></p> <p>So, what we have here is two sources (each bound to helper conduits which) which produce a <code>Packet</code> object which <code>encoder</code> will accept and turn into <code>ByteString</code>, then send out the socket. I've had a great amount of difficulty with the efficient (performance is a concern) fusing of the two inputs.</p> <p>I would appreciate if somebody could point me in the right direction.</p> <hr> <p>Since it would be rude of me to post this question without making an attempt, I'll put what I've previously tried here;</p> <p>I've written/cherrypicked a function which (blocking) produces a Source from a TMChan (closeable channel);</p> <pre><code>-- | Takes a generic type of STM chan and, given read and close functionality, -- returns a conduit 'Source' which consumes the elements of the channel. chanSource :: (MonadIO m, MonadSTM m) =&gt; a -- ^ The channel -&gt; (a -&gt; STM (Maybe b)) -- ^ The read function -&gt; (a -&gt; STM ()) -- ^ The close/finalizer function -&gt; Source m b chanSource ch readCh closeCh = ConduitM pull where close = liftSTM $ closeCh ch pull = PipeM $ liftSTM $ readCh ch &gt;&gt;= translate translate = return . maybe (Done ()) (HaveOutput pull close) </code></pre> <p>Likewise, a function to transform a Chan into a sink;</p> <pre><code>-- | Takes a stream and, given write and close functionality, returns a sink -- which wil consume elements and broadcast them into the channel chanSink :: (MonadIO m, MonadSTM m) =&gt; a -- ^ The channel -&gt; (a -&gt; b -&gt; STM()) -- ^ The write function -&gt; (a -&gt; STM()) -- ^ The close/finalizer function -&gt; Sink b m () chanSink ch writeCh closeCh = ConduitM sink where close = const . liftSTM $ closeCh ch sink = NeedInput push close write = liftSTM . writeCh ch push x = PipeM $ write x &gt;&gt; return sink </code></pre> <p>Then mergeSources is straightforward; fork 2 threads (which I really don't want to do, but what the heck) which can put their new items into the one list which I then produce a source of;</p> <pre><code>-- | Merges a list of 'Source' objects, sinking them into a 'TMChan' and returns -- a source which consumes the elements of the channel. mergeSources :: (MonadIO m, MonadBaseControl IO m, MonadSTM m) =&gt; [Source (ResourceT m) a] -- ^ The list of sources -&gt; ResourceT m (Source (ResourceT m) a) mergeSources sx = liftSTM newTMChan &gt;&gt;= liftA2 (&gt;&gt;) (fsrc sx) retn where push c s = s $$ chanSink c writeTMChan closeTMChan fsrc x c = mapM_ (\s -&gt; resourceForkIO $ push c s) x retn c = return $ chanSource c readTMChan closeTMChan </code></pre> <p>While I was successful in making these functions typecheck, I was unsuccessful in getting any utilization of these functions to typecheck;</p> <pre><code>-- | Helper which represents a conduit chain for each client connection serverApp :: Application SessionIO serverApp appdata = do use ssBroadcast &gt;&gt;= liftIO . atomically . dupTMChan &gt;&gt;= assign ssBroadcast -- appSource appdata $$ decoder $= protocol =$= encoder =$ appSink appdata mergsrc $$ protocol $= encoder =$ appSink appdata where chansrc = chanSource (use ssBroadcast) readTMChan closeTMChan mergsrc = mergeSources [appSource appdata $= decoder, chansrc] -- | Structure which holds mutable information for clients data SessionState = SessionState { _ssBroadcast :: TMChan Packet -- ^ Outbound packet broadcast channel } makeLenses ''SessionState -- | A transformer encompassing both SessionReader and SessionState type Session m = ReaderT SessionReader (StateT SessionState m) -- | Macro providing Session applied to an IO monad type SessionIO = Session IO </code></pre> <p>I see this method as being flawed anyhow -- there are many intermediate lists and conversions. This can not be good for performance. Seeking guidance.</p> <hr> <p>PS. From what I can understand, this is not a duplicate of; <a href="https://stackoverflow.com/questions/15594556/fusing-conduits-with-multiple-inputs/15597009#15597009">Fusing conduits with multiple inputs</a> , as in my situation both sources produce the same type and I don't care from which source the <code>Packet</code> object is produced, as long as I'm not waiting on one while another has objects ready to be consumed.</p> <p>PPS. I apologize for the usage (and therefore requirement of knowledge) of Lens in example code.</p>
 

Querying!

 
Guidance

SQuiL has stopped working due to an internal error.

If you are curious you may find further information in the browser console, which is accessible through the devtools (F12).

Reload