How to Merge Two Sorted Streams Into One Sorted Stream

The title says it all, doesn’t it? If we were given two sorted streams, how could we merge them into one single sorted stream?

The Data Structure

The term Stream implies a structure whose elements can be retrieved and processed but whose storage cannot be inspected or manipulated. Java has a couple different streams, such as java.io.InputStream and java.util.stream.Stream, but for purposes of this exercise we’ll use the simplest interface we can that will let us stream over elements: java.util.Iterator.

A First Pass

At first glance we may be tempted to write a simple single method that accepts two iterators and returns a single iterator. Such a method could stream both iterators into a list, sort the list, and then return an iterator for the resulting list. This approach suffers from two flaws. The first flaw is the performance hit of having to do a sort. The second flaw is a fatal one: the iterators could be infinite.

Truly unbounded iterators are rare in the wild. But we could imagine, say, an iterator representing the set of incoming requests to an HTTP server, sorted by request time. Such an iterator is a sorted iterator that can never end as long as the server is running. If you wanted to merge two request processing threads, a merge technique like this could be useful.

A Second Pass

Ok, so we need to stream through these iterators without trying to store values. To imagine how this will work, imagine that the two iterators are like the two sides of a zipper, and we are zipping the two sides together into one zipped zipper. In practice, we will take the two leading elements of each stream, compare them, and return the one that should appear earlier in a sort. Since both streams are already sorted, we are essentially sorting the merged values as we return each one.

To accomplish this, our merged iterator will need to retain a reference to the two incoming iterators, and to do that our merged iterator will have to be its own class. Additionally, this new class will need to retain a reference to the “next” value from each iterator so that we can repeatedly compare the next value of each iterator until it needs to be returned.


public class MergedIterator<T extends Comparable> implements Iterator<T> {

    private Iterator<T> s1;
    private Iterator<T> s2;
    private T next1;
    private T next2;

    public MergedIterator(Iterator<T> s1, Iterator<T> s2) {
        if (s1 == null || s2 == null) {
            throw new IllegalArgumentException("arguments can't be null");
        }

        this.s1 = s1;
        this.s2 = s2;

        next1 = s1.hasNext() ? s1.next() : null;
        next2 = s2.hasNext() ? s2.next() : null;
    }

    public boolean hasNext() {} // TODO
    public T next() { } // TODO

How to choose the next value

So we have an iterator backed by two other iterators and the next value for each iterator. We can compare these two values and return the one that should appear next in sorted order. Before returning we need to be sure to pop the next value from that iterator so we can do another comparison on any subsequent call to .next(). The content of MergedIterator.next() might look like this:

        if (next1.compareTo(next2) <= 0) {
            T returnObject = next1;
            next1 = s1.hasNext() ? s1.next() : null;
            return returnObject;
        } 
        else {
            T returnObject = next2;
            next2 = s2.hasNext() ? s2.next() : null;
            return returnObject;
        }

What if an Iterator is Empty?

To indicate that an iterator is empty, the value holding that iterator’s next value can be null. We can do a check so that one iterator’s value will always be returned if the other’s is null. In other words: once an iterator is empty the other iterator is the only one that can provide values. We can add this to MergedIterator.next()

        // if one stream is empty,
        // just continue streaming from the other
        if (next1 == null && next2 != null) {
            T returnObject = next2;
            next2 = s2.hasNext() ? s2.next() : null;
            return returnObject;
        }

        if (next1 != null && next2 == null) {
            T returnObject = next1;
            next1 = s1.hasNext() ? s1.next() : null;
            return returnObject;
        }

This also leads us to .hasNext(). If we use .hasNext() on the backing iterators to implement MergedIterator.hasNext(), we’ll never get the last element because the merged iterator will think both backing iterators are empty when in reality the last element could be in the stored next element. so hasNext() should look like this:

    @Override
    public boolean hasNext() {
        return next1 != null || next2 != null;
    }

Reducing Duplicate Code

Finally, we might notice some repeated code in next() and decide that we really only need to decide one thing: which stream to use. We can merge these conditions and end up with half the code at the expense of some readability. I’ll leave it as an exercise to the reader which way is really more readable.

        // can use half the code at the expense of some readability
        boolean useStream1 = (next1 != null && next2 == null)  ||
                             (next1 != null && next2 != null && next1.compareTo(next2) <= 0);

        if(useStream1) {
            T returnObject = next1;
            next1 = s1.hasNext() ? s1.next() : null;
            return returnObject;            
        }
        else {
            T returnObject = next2;
            next2 = s2.hasNext() ? s2.next() : null;
            return returnObject;
        }

With all of this together, here is the complete class.

public class MergedIterator<T extends Comparable> implements Iterator<T> {

    private Iterator<T> s1;
    private Iterator<T> s2;
    private T next1;
    private T next2;

    public MergedIterator(Iterator<T> s1, Iterator<T> s2) {
        if (s1 == null || s2 == null) {
            throw new IllegalArgumentException("arguments can't be null");
        }

        this.s1 = s1;
        this.s2 = s2;

        next1 = s1.hasNext() ? s1.next() : null;
        next2 = s2.hasNext() ? s2.next() : null;
    }

    // NOT THREAD SAFE
    @Override
    public boolean hasNext() {
        // if you use .hasNext() on the backing iterators, you'll never get the last element
        return next1 != null || next2 != null;
    }

    // NOT THREAD SAFE
    @Override
    public T next() {

        // returning null instead of throwing exceptions makes it easier to use parameterized tests
        if(!hasNext()) {
            return null;
        }
        
        // can use half the code at the expense of some readability
        boolean useStream1 = (next1 != null && next2 == null)  ||
                             (next1 != null && next2 != null && next1.compareTo(next2) <= 0);

        if(useStream1) {
            T returnObject = next1;
            next1 = s1.hasNext() ? s1.next() : null;
            return returnObject;            
        }
        else {
            T returnObject = next2;
            next2 = s2.hasNext() ? s2.next() : null;
            return returnObject;
        }
        
//        // if one stream is empty, can just continue streaming from the other
//        if (next1 == null && next2 != null) {
//            T returnObject = next2;
//            next2 = s2.hasNext() ? s2.next() : null;
//            return returnObject;
//        }
//
//        if (next1 != null && next2 == null) {
//            T returnObject = next1;
//            next1 = s1.hasNext() ? s1.next() : null;
//            return returnObject;
//        }
//
//        if (next1.compareTo(next2) <= 0) {
//            T returnObject = next1;
//            next1 = s1.hasNext() ? s1.next() : null;
//            return returnObject;
//        } 
//        else {
//            T returnObject = next2;
//            next2 = s2.hasNext() ? s2.next() : null;
//            return returnObject;
//        }
    }

    @Override
    public void remove() {
        throw new UnsupportedOperationException("Not supported.");
    }

}

How To Test This

I propose these test cases for the incoming iterators: both empty, one or the other is empty, and both non-empty but one or the other is longer. A parameterized test will accomplish this nicely.

@RunWith(Parameterized.class)
public class MergedIteratorTest {

    private final List<Integer> expectedMerge;
    private final MergedIterator<Integer> mergedIterator;

    public MergedIteratorTest(List<Integer> stream1, List<Integer> stream2, List<Integer> expected) {
        expectedMerge = expected;
        mergedIterator = new MergedIterator<>(stream1.iterator(), stream2.iterator());
    }

    @Test
    public void testIterator() throws Exception {
        List<Integer> merged = streamIntoList(mergedIterator);
        assertEquals(expectedMerge, merged);
    }

    @Parameters
    public static List<Object[]> data() {
        return Arrays.asList(new Object[][]{
            {new ArrayList<>(), new ArrayList<>(), new ArrayList<>()},
            {Arrays.asList(1, 2, 3, 4, 5, 6), new ArrayList<>(), Arrays.asList(1, 2, 3, 4, 5, 6)},
            {new ArrayList<>(), Arrays.asList(1, 2, 3, 4, 5, 6), Arrays.asList(1, 2, 3, 4, 5, 6)},
            {Arrays.asList(1, 3, 5), Arrays.asList(1, 2, 4, 6), Arrays.asList(1, 1, 2, 3, 4, 5, 6)},
            {Arrays.asList(1, 2, 4, 6), Arrays.asList(1, 3, 5), Arrays.asList(1, 1, 2, 3, 4, 5, 6)}
        });
    }
    
    private static List<Integer> streamIntoList(Iterator<Integer> m) {
        List<Integer> merged = new ArrayList<>();
        while (m.hasNext()) {
            merged.add(m.next());
        }
        return merged;
    }
}

And there we have it! This was a fun little exercise about merging unbounded sorted streams, and importantly, how to test it. Maybe next time we can merge Java 8 Streams!

Advertisements

Leave a comment

Filed under Software Engineering

Leave a Reply

Fill in your details below or click an icon to log in:

WordPress.com Logo

You are commenting using your WordPress.com account. Log Out / Change )

Twitter picture

You are commenting using your Twitter account. Log Out / Change )

Facebook photo

You are commenting using your Facebook account. Log Out / Change )

Google+ photo

You are commenting using your Google+ account. Log Out / Change )

Connecting to %s