Note that there are some explanatory texts on larger screens.

plurals
  1. PO
    primarykey
    data
    text
    <p>If I understand correctly, you've got some realtime source of data that you want to multiplex to multiple sockets. You've got a single "source" pipe hooked up to whatever's producing your data, and you've got a "destination" pipe for each socket over which you wish to send the data. What you're doing is using <code>tee()</code> to copy data from the source pipe to each of the destination pipes and <code>splice()</code> to copy it from the destination pipes to the sockets themselves.</p> <p>The fundamental issue you're going to hit here is if one of the sockets simply can't keep up - if you're producing data faster than you can send it, then you're going to have a problem. This isn't related to your use of pipes, it's just a fundamental issue. So, you'll want to pick a strategy to cope in this case - I suggest handling this even if you don't expect it to be common as these things often come up to bite you later. Your basic choices are to either close the offending socket, or to skip data until it's cleared its output buffer - the latter choice might be more suitable for audio/video streaming, for example.</p> <p>The issue which <em>is</em> related to your use of pipes, however, is that on Linux the size of a pipe's buffer is somewhat inflexible. It defaults to 64K since Linux 2.6.11 (the <code>tee()</code> call was added in 2.6.17) - see the <a href="http://www.kernel.org/doc/man-pages/online/pages/man7/pipe.7.html">pipe manpage</a>. Since 2.6.35 this value can be changed via the <code>F_SETPIPE_SZ</code> option to <code>fcntl()</code> (see the <a href="http://www.kernel.org/doc/man-pages/online/pages/man2/fcntl.2.html">fcntl manpage</a>) up to the limit specified by <code>/proc/sys/fs/pipe-size-max</code>, but the buffering is still more awkward to change on-demand than a dynamically allocated scheme in user-space would be. This means that your ability to cope with slow sockets will be somewhat limited - whether this is acceptable depends on the rate at which you expect to receive and be able to send data.</p> <p>Assuming this buffering strategy is acceptable, you're correct in your assumption that you'll need to track how much data each destination pipe has consumed from the source, and it's only safe to discard data which all destination pipes have consumed. This is somewhat complicated by the fact that <code>tee()</code> doesn't have the concept of an offset - you can only copy from the start of the pipe. The consequence of this is that you can only copy at the speed of the slowest socket, since you can't use <code>tee()</code> to copy to a destination pipe until some of the data has been consumed from the source, and you can't do <em>this</em> until all the sockets have the data you're about to consume.</p> <p>How you handle this depends on the importance of your data. If you really need the speed of <code>tee()</code> and <code>splice()</code>, and you're confident that a slow socket will be an extremely rare event, you could do something like this (I've assumed you're using non-blocking IO and a single thread, but something similar would also work with multiple threads):</p> <ol> <li>Make sure all pipes are non-blocking (use <code>fcntl(d, F_SETFL, O_NONBLOCK)</code> to make each file descriptor non-blocking).</li> <li>Initialise a <code>read_counter</code> variable for each destination pipe to zero.</li> <li>Use something like <a href="http://www.kernel.org/doc/man-pages/online/pages/man4/epoll.4.html">epoll()</a> to wait until there's something in the source pipe.</li> <li>Loop over all destination pipes where <code>read_counter</code> is zero, calling <code>tee()</code> to transfer data to each one. Make sure you pass <code>SPLICE_F_NONBLOCK</code> in the flags.</li> <li>Increment <code>read_counter</code> for each destination pipe by the amount transferred by <code>tee()</code>. Keep track of the lowest resultant value.</li> <li>Find the lowest resultant value of <code>read_counter</code> - if this is non-zero, then discard that amount of data from the source pipe (using a <code>splice()</code> call with a destination opened on <code>/dev/null</code>, for example). After discarding data, subtract the amount discarded from <code>read_counter</code> on <em>all</em> the pipes (since this was the lowest value then this cannot result in any of them becoming negative).</li> <li>Repeat from step <strong>3</strong>.</li> </ol> <p>Note: one thing that's tripped me up in the past is that <code>SPLICE_F_NONBLOCK</code> affects whether the <code>tee()</code> and <code>splice()</code> operations on the pipes are non-blocking, and the <code>O_NONBLOCK</code> you set with <code>fnctl()</code> affects whether the interactions with other calls (e.g. <code>read()</code> and <code>write()</code>) are non-blocking. If you want everything to be non-blocking, set both. Also remember to make your sockets non-blocking or the <code>splice()</code> calls to transfer data to them might block (unless that's what you want, if you're using a threaded approach).</p> <p>As you can see, this strategy has a major problem - as soon as one socket blocks up, everything halts - the destination pipe for that socket will fill up, and then the source pipe will become stagnant. So, if you reach the stage where <code>tee()</code> returns <code>EAGAIN</code> in step <strong>4</strong> then you'll want to either close that socket, or at least "disconnect" it (i.e. take it out of your loop) such that you don't write anything else to it until its output buffer is empty. Which you choose depends on whether your data stream can recovery from having bits of it skipped.</p> <p>If you want to cope with network latency more gracefully then you're going to need to do more buffering, and this is going to involve either user-space buffers (which rather negates the advantages of <code>tee()</code> and <code>splice()</code>) or perhaps disk-based buffer. The disk-based buffering will almost certainly be significantly slower than user-space buffering, and hence not appropriate given that presumably you want a lot of speed since you've chosen <code>tee()</code> and <code>splice()</code> in the first place, but I mention it for completeness.</p> <p>One thing that's worth noting if you end up inserting data from user-space at any point is the <code>vmsplice()</code> call which can perform "gather output" from user-space into a pipe, in a similar way to the <code>writev()</code> call. This might be useful if you're doing enough buffering that you've split your data among multiple different allocated buffers (for example if you're using a pool allocator approach).</p> <p>Finally, you could imagine swapping sockets between the "fast" scheme of using <code>tee()</code> and <code>splice()</code> and, if they fail to keep up, moving them on to a slower user-space buffering. This is going to complicate your implementation, but if you're handling large numbers of connections and only a very small proportion of them are slow then you're still reducing the amount of copying to user-space that's involved somewhat. However, this would only ever be a short-term measure to cope with transient network issues - as I said originally, you've got a fundamental problem if your sockets are slower than your source. You'd eventually hit some buffering limit and need to skip data or close connections.</p> <p>Overall, I would carefully consider why you need the speed of <code>tee()</code> and <code>splice()</code> and whether, for your use-case, simply user-space buffering in memory or on disk would be more appropriate. If you're confident that the speeds will always be high, however, and limited buffering is acceptable then the approach I outlined above should work.</p> <p>Also, one thing I should mention is that this will make your code extremely Linux-specific - I'm not aware of these calls being support in other Unix variants. The <code>sendfile()</code> call is more restricted than <code>splice()</code>, but might be rather more portable. If you really want things to be portable, stick to user-space buffering.</p> <p>Let me know if there's anything I've covered which you'd like more detail on.</p>
    singulars
    1. This table or related slice is empty.
    plurals
    1. This table or related slice is empty.
    1. This table or related slice is empty.
    1. This table or related slice is empty.
    1. This table or related slice is empty.
    1. VO
      singulars
      1. This table or related slice is empty.
    2. VO
      singulars
      1. This table or related slice is empty.
    3. VO
      singulars
      1. This table or related slice is empty.
    1. COI wish I could +10 your answer. Yes you've described my problem well, and you're right about the problem if one socket can't keep up. Each socket should go at the same speed, over time, unless one recipient fails. In which case the only sensible thing to do is drop it from the replication set anyway. But the thing that you've missed is that while pipes are 64KB by default, you can fcntl them up to 1MB (a limit itself which can be raised by modifying /proc/sys/fs/pipe-max-size.) I have enough memory that I could allocate as much as 64MB to each pipe. What do you think?
      singulars
    2. COI never knew about `F_SETPIPE_SZ`, thanks! I've edited my answer. Remember that 2.6.35 is still *a bit* new (e.g. Ubuntu 10.04 LTS is 2.6.32 AFAIK). The approach seems fine as long as you don't mind the Linux-specificity. I'd try to limit the scope of the code which is Linux-specific, just in case. Only other thing to remember is that this is only one aspect of the solution - if performance is critical I'd suggest playing with non-blocking IO vs. threads vs. processes to see which works best for you. One of the nice things about pipes is that they work well across `fork()` if you need that.
      singulars
    3. COOne thing I forgot to mention - a multiprocess approach might seem an obvious way to improve performance on today's multicore systems, but bear in mind there's going to be a lot of memory sharing so it's not straightforward. For example, on [NUMA](http://en.wikipedia.org/wiki/Non-Uniform_Memory_Access) architectures (e.g. AMD Opterons), multiple cores frequently accessing the same memory can create a performance hit. Even on [SMP](http://en.wikipedia.org/wiki/Symmetric_multiprocessing) systems it's not clear cut whether multiprocess would buy you anything if your bottleneck is the memory bus.
      singulars
 

Querying!

 
Guidance

SQuiL has stopped working due to an internal error.

If you are curious you may find further information in the browser console, which is accessible through the devtools (F12).

Reload