使用 wait() 和 notify() 机制来完成“睡眠”和“踢”。实际的消费者工作由 OnConsume(Object) 方法处理,如清单 3 所示:
清单 3. 唤醒和通知 Consumer
/**
* Add an object to the Consumer.
* This is the entry point for the producer.
* After the item is added, the Consumer´s thread
* will be notified.
*
* @param the object to be ´consumed´ by this consumer
*/
public void add(Object o)
{
_queue.add(o);
kickThread();
}
/**
* Wake up the thread (without adding new stuff to consume)
*
*/
public void kickThread()
{
if (!this._thread.isInterrupted())
{
synchronized(_waitForJobsMonitor)
{
_waitForJobsMonitor.notify();
}
}
}
示例:MessagesProcessor
为了向您展示 Consumer 类是如何工作的,我们将使用一个简单示例。MessagesProcessor 类以异步方式处理进入的消息(也就是说,不干扰调用线程)。其工作是在每个消息到来时打印它。MessagesProcessor 具有一个处理到来的消息作业的内部 Consumer。当新作业进入空队列时,Consumer 调用 processMessage(String) 方法来处理它,如清单 4 所示:
清单 4. MessagesProcessor 类
class MessagesProcessor
{
String _name;
// anonymous inner class that supplies the consumer
// capabilities for the MessagesProcessor
private Consumer _consumer = new Consumer()
{
// that method is called on each event retrieved
protected void onConsume(Object o)
{
if (!(o instanceof String))
{
System.out.println("illegal use, ignoring");
return;
}
MessagesProcesser.this.processMessage((String)o);
}
}.setName("MessagesProcessor").init();
public void gotMessageEvent(String s)
{
_consumer.add(s);
}
private void processMessage(String s)
{
System.out.println(_name+" processed message: "+s);
}
private void terminate()
{
_consumer.terminateWait();
_name = null;
}
MessagesProcessor()
{
_name = "Example Consumer";
}
}
正如您可以从上面的代码中所看到的,定制 Consumer 相当简单。我们使用了一个匿名内部类来继承 Consumer 类,并重载抽象方法 onConsume()。因此,在我们的示例中,只需调用 processMessage。
Consumer 类的高级特性
除了开始时提出的基本需求以外,我们还为 Consumer 类提供了一些我们觉得有用的高级特性。
事件通知
onThreadTerminate():只在终止 Consumer 前调用该方法。我们出于调试目的覆盖了这个方法。
goingToRest():只在 Consumer 线程进入休眠前调用该方法(也就是说,只在调用 _waitForJobsMonitor.wait() 之前调用)。只在需要消费者在进入休眠之前处理一批已处理工作的复杂情况中,可能需要这种通知。
终止
terminate():Consumer 线程的异步终止。
terminateWait():设置调用线程一直等待,直到消费者线程实际终止为止。
在我们的示例中,如果使用 terminate() 而不是 terminateWait(),那么将会出现问题,因为在将 _name 设置成空值之后调用 onConsume() 方法。这将导致执行 processMessage 的线程抛出一个 NullPointerException。
结束语:Consumer 类的好处
可在参考资料一节下载 Consumer 类的源代码。请自由使用源代码,并按照您的需要扩展它。我们发现将这个类用于多线程应用程序开发有许多好处:
代码重用/重复代码的消除:如果您有 Consumer 类,就不必为您应用程序中的每个实例编写一个新的消费者。如果在应用程序开发中频繁使用生产者-消费者方案,这可以很大程度地节省时间。另外,请牢记重复代码是滋生错误的沃土。它还使基本代码的维护更为困难。
更少错误:使用验证过的代码是一种防止错误的好实践,尤其是处理多线程应用程序时。因为 Consumer 类已经被调试过,所以它更安全。消费者还通过在线程和资源之间担任安全中介来防止与线程相关的错误。消费者可以代表其它线程以顺序的方式访问资源。
漂亮、清晰的代码:使用 Consumer 类有助于我们编写出更简单的代码,这样的代码更容易理解和维护。如果我们不使用 Consumer 类,就必须编写代码来处理两种不同的功能:消费逻辑(队列和线程管理、同步等)和指定消费者的用法或功能的代码。
多谢 Allot Communications 的 Jonathan Lifton 和 Dov Trietsch 对本文提供的帮助。
参考资料
请单击本文顶部或底部的讨论参与本文的论坛。
请下载 Consumer 类的源代码。请自由使用该源代码,并根据您的需要扩展它。
Alex Roetter 的“Writing multithreaded Java applications”(developerWorks,2001 年 2 月)很好地介绍了使用“Java 线程 API”来开发多线程应用程序。
Brian Goetz 关于多线程的三部分系列文章(developerWorks,2001 年 7 月至 2001 年 9 月)提供了对于多线程应用程序开发中产生的某些公共问题的实用解决方案。请阅读整个系列文章:
第 1 部分:“同步不是敌人”
第 2 部分:“减少争用”
第 3 部分:“有时最好不要共享”
如果 Allen Holub 是国王,那么 Java 语言在对多线程的支持方面将会经历许多重要的更改。关于这些更改的所有内容,请阅读他的修改 Java 语言的线程模型的提议(developerWorks,2000 年 10 月)。
Eric Allen 的专栏文章“Diagnosing Java Code:The Orphaned Thread bug pattern”(developerWorks,2001 年 8 月)提供了关于多线程代码调试的提示。请注意 Eric 在其示例中使用了生产者-消费者方案。
Java Developer Connection 为那些进行线程同步研究的人提供了一条有价值的线索。它的一个关键示例使用了生产者-消费者方案。
如何在 Java 应用程序中支持已计划安排的事件?为了方便实现这一任务,Sun Microsystems 在 JDK 1.3 中引入了新的 Timer API。
Java Developer Connection 的文章“An Introduction to Java Stack Traces”向您展示了如何在堆栈跟踪中识别和收集线索,以解决您的 Java 软件问题。
IBM Developer Kit for Linux,Java 技术版,版本 1.3 提供了对多线程应用程序开发的完整支持。
在 developerWorks Java 技术专区中,关于 Java 编程的每个方面,您都可以找到数百篇文章。
关于作者
Joseph(Saffi)Hartal 是 GlobaLoop LTD 的软件开发人员。Saffi 拥有 Tel-Aviv 大学计算机科学及数学学士学位和工商管理硕士学位。最近 10 年,他一直从事软件开发,在此期间,他用 C++ 编写了实时嵌入代码,还编写了 Java 客户机和服务器应用程序。Saffi 将大多数时间用于编写基础结构代码和解决难题。可以通过 saffi@myrealbox.com 与他联系。
Ze´ev Bubis 是 GlobaLoop LTD 一个软件开发小组的负责人。Ze´ev 拥有 Tel-Aviv 大学计算机科学及数学学士学位。最近 10 年,他一直从事软件开发,在此期间,他为多种平台和语言编写软件应用程序。最近三年中,Ze´ev 专注于用 Java 开发客户机和服务器应用程序。可以通过 zeevb@myrealbox.com 与他联系。