C# Reactive Extensions - Buffer and Window

I was going through Buffer and Window in RX, thought a few examples would help clear the differences.

First create a buffer of even numbers, each buffer having 5 items:

(we can ignore Timestamp() for now, that will be used later when we see how buffering and windowing work with streams with items coming in over time)

int bufferId = 0;

var bufferedNumberStream = Observable.Range(0, 100)
         .Where(v => v % 2 == 0)
         .Subscribe(ts =>
               Console.WriteLine("Buffer ID::" + bufferId);
               ts.Value.ToList().ForEach(x => Console.WriteLine(x));

Output: Buffers created with 5 items in each buffer

Odd numbers stream with window with 5 items in each:

int windowId = 0;

var windowedNumberStream = Observable.Range(0, 100)
    .Where(v =>; v % 2 == 1)
    .Window(5, 5)
    .Subscribe(w =>;
         Console.WriteLine("Window ID::" + windowId);
         w.Subscribe(ts =>;
                Console.WriteLine("Value::" + ts.Value);

The line .Window(5,5) means the window will have count of 5 items, and the next window will be created after skipping these 5 items. (count, skip) or (range, slide).


Difference 1:

The primary difference in buffer and window is that a buffer gives you an IList of items, in our example IList, and window gives you and IObservable that you need to again subscribe to see the values inside the window.

So .Window() gives back IObservable>, the windows are observable and the items in every single window are also an IObservable.

Difference 2:

A new buffer starts as soon as there are n items are in it. However a window needs two things, the items to be in it, and how far to slide the window. In a window it's possible that you process an item in the stream again, but that's not possible in buffer.

Moving average is excellent example of window.

In the next post, let's see how buffer and window behave when we create them based on time stamp and not based on number of items in them.


Popular Posts