/* // $Id: //guest/julian_hyde/saffron/src/main/saffron/runtime/ThreadIterator.java#1 $ // Saffron preprocessor and data engine // Copyright (C) 2002 Julian Hyde <julian.hyde@mail.com> // // This library is free software; you can redistribute it and/or // modify it under the terms of the GNU Library General Public // License as published by the Free Software Foundation; either // version 2 of the License, or (at your option) any later version. // // This library is distributed in the hope that it will be useful, // but WITHOUT ANY WARRANTY; without even the implied warranty of // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU // Library General Public License for more details. // // You should have received a copy of the GNU Library General Public // License along with this library; if not, write to the // Free Software Foundation, Inc., 59 Temple Place - Suite 330, // Boston, MA 02111-1307, USA. // // See the COPYING file located in the top-level-directory of // the archive of this library for complete text of license. */ package saffron.runtime; import java.util.Iterator; /** * <code>ThreadIterator</code> converts 'push' code to 'pull'. You implement * {@link #doWork} to call {@link #put} with each row, and this class invokes * it in a separate thread. Then the results come out via the familiar {@link * Iterator} interface. For example,<blockquote> * * <pre>class ArrayIterator extends ThreadIterator { * Object[] a_; * ArrayIterator(Object[] a) { * this.a_ = a; * start(); * } * protected void doWork() { * for (int i = 0; i < a_.length; i++) { * put(a[i]); * } * } * }</pre></blockquote> * * Or, more typically, using an anonymous class:<blockquote> * * <pre>Iterator i = new ThreadIterator() { * int limit; * public ThreadIterator start(int limit) { * this.limit = limit; * return super.start(); * } * protected void doWork() { * for (int i = 0; i < limit; i++) { * put(new Integer(i)); * } * } * }.start(100); * while (i.hasNext()) { * <em>etc.</em> * }</pre></blockquote> **/ public abstract class ThreadIterator extends QueueIterator implements Iterator, Runnable { public ThreadIterator() {} protected ThreadIterator start() { new Thread(this).start(); return this; } /** * The implementation should call {@link #put} with each row. **/ protected abstract void doWork(); // implement Runnable public void run() { boolean calledDone = false; try { doWork(); } catch (Throwable e) { done(e); calledDone = true; } finally { if (!calledDone) { done(null); } } } public static void main(String[] args) { new ThreadIteratorTest(new java.io.PrintWriter(System.out,true)); } }; /** * <code>QueueIterator</code> contains at most one object. If you call {@link * #next}, your thread will wait until another thread calls {@link #put} or * {@link #done}. Nulls are allowed. If the producer has an error, they can * pass it to the consumer via {@link #done}. **/ class QueueIterator implements Iterator { /** * Protects the <code>avail_</code> semaphore. **/ private boolean waitingForProducer_; private boolean hasNext_; private Object next_; private Throwable throwable_; /** * The producer notifies <code>empty</code> every time it produces an * object (or finishes). The consumer waits for it. **/ private Semaphore empty; /** * <p>Conversely, the consumer notifies <code>full</code> every time * it reads the next object. The producer waits for it, then starts * work.</p> **/ private Semaphore full; QueueIterator() { empty = new Semaphore(0); full = new Semaphore(1); waitingForProducer_ = true; } // implement Iterator public synchronized boolean hasNext() { if (waitingForProducer_) { empty.enter(); // wait for producer to produce one waitingForProducer_ = false; } if (!hasNext_) { checkError(); } return hasNext_; } // implement Iterator public synchronized Object next() { if (waitingForProducer_) { empty.enter(); // wait for producer to produce one waitingForProducer_ = false; } if (!hasNext_) { throw new Error("no more"); } Object o = next_; waitingForProducer_ = true; full.leave(); return o; } // implement Iterator public void remove() { throw new UnsupportedOperationException(); } /** * Throws an error if one has been set via {@link done(Throwable)}. **/ private void checkError() { if (throwable_ == null) { ; } else if (throwable_ instanceof RuntimeException) { throw (RuntimeException) throwable_; } else if (throwable_ instanceof Error) { throw (Error) throwable_; } else { throw new Error("error: " + throwable_); } } /** * Producer calls <code>put</code> to add another object (which may be * null). **/ public void put(Object o) { // This method is NOT synchronized. If it were, it would deadlock with // the waiting consumer thread. The consumer thread has called // full.leave(), and that is sufficient. full.enter(); // wait for consumer thread to use previous hasNext_ = true; next_ = o; empty.leave(); // wake up consumer thread } /** * Producer calls <code>done</code> to say that there are no more objects, * setting <code>throwable</code> if there was an error. **/ public void done(Throwable throwable) { // This method is NOT synchronized. If it were, it would deadlock with // the waiting consumer thread. The consumer thread has called // consumed.leave(), and that is sufficient. full.enter(); // wait for consumer thread to use previous hasNext_ = false; next_ = null; throwable_ = throwable; empty.leave(); // wake up consumer thread } }; class Semaphore { int count; Semaphore(int count) { this.count = count; } synchronized void enter() { while (count <= 0) { try { wait(); } catch (InterruptedException e) { } } // we have control, decrement the count count--; } synchronized void leave() { count++; notify(); } }; /** * For testing. **/ class ArrayIterator extends ThreadIterator { Object[] a_; ArrayIterator(Object[] a) { this.a_ = a; start(); } protected void doWork() { for (int i = 0; i < a_.length; i++) { this.put(a_[i]); } } }; /** * Test harness for {@link ThreadIterator}. **/ class ThreadIteratorTest { ThreadIteratorTest(java.io.PrintWriter pw) { pw.print("Should print {x,y,z}: "); for (Iterator i = new ArrayIterator(new String[] { "x", "y", "z"}); i.hasNext(); ) { pw.print((String) i.next()); } pw.println("."); pw.print("Should print nothing"); for (Iterator i = new ArrayIterator(new Object[] {}); i.hasNext(); ) { pw.print(i.next()); } pw.println("."); pw.print("Should print {0..9}: "); Iterator iter = new ThreadIterator() { int limit; public ThreadIterator start(int limit) { this.limit = limit; return super.start(); } protected void doWork() { for (int i = 0; i < limit; i++) { put(new Integer(i)); } } }.start(10); while (iter.hasNext()) { pw.print((Integer) iter.next()); } pw.println("."); pw.print("Should print {2,5} then get a NullPointerException: "); iter = new ThreadIterator() { String strings[]; public ThreadIterator start(String[] strings) { this.strings = strings; return start(); } protected void doWork() { for (int i = 0; i < strings.length; i++) { put(new Integer(strings[i].length())); } } }.start(new String[] {"john","paul", null, "ringo"}); try { while (iter.hasNext()) { pw.print((Integer) iter.next()); } } catch (NullPointerException e) { pw.print("got NPE"); } pw.println("."); } }; // End ThreadIterator.java
# | Change | User | Description | Committed | |
---|---|---|---|---|---|
#2 | 1748 | Julian Hyde |
saffron: convert unit tests to JUnit; add CallingConvention.ITERABLE; lots of other stuff; release 0.1. |
||
#1 | 1467 | Julian Hyde |
saffron: First saffron check-in; incorporate my changes to openjava. |