AKKA 邮箱Mailbox

阅读:1008 2019-03-20 14:09:12 来源:新网

邮箱mailbox

akkamailbox持有发给actor的消息。通常,每一个actor都与自己的邮箱,但是当使用balancingpool时,所有的路由都共享一个邮箱实例。

邮箱选择

actor需要的消息队列类型

特定类型的actor可以用特定类型的消息队列,只要这个actor实现了参数化的接口requiresmessagequeue。这里是一个例子:

requiresmessagequeue接口的类型参数需要在配置中映射到一个邮箱,就像这样:

现在每次你创建一个类型为myboundeduntypedactor的actor,它都将会尝试获取一个有界邮箱。如果actor在部署时配置了不同的邮箱,可能是直接配置的,也可能是通过带有特定邮箱类型的分发器,那么就会覆写这个映射。

注意

为actor创建的邮箱中的队列类型用接口中要求的类型进行检查,如果队列没有实现要求的类型,那么actor创建就会失败。

dispatcher需要的消息队列类型

分发器也需要一个邮箱类型,用于运行中的actor。一个例子就是balancingdispatcher,它需要一个并发的、线程安全的消息队列。这样的需求可以在分发器配置中进行规划,就像这样:

给定的需求命名了一个类或者接口,必须保证这个类或者接口是消息队列实现的超类型。万一冲突了,例如如果actor需要一个邮箱类型,但是它不满足这个需求,那么actor创建就会失败。

如何选择邮箱类型

当创建actor时,actorrefprovider首先确定分发器,分发起会执行actor。然后按照如下顺序确定邮箱类型:

默认邮箱

当按照上述描述的依然没有指定邮箱。那么就会使用默认的邮箱。默认邮箱his一个无界邮箱,是由java.util.concurrent.concurrentlinkedqueue支持的。

singleconsumeronlyunboundedmailbox是更高效的邮箱,它可被用于默认邮箱,但是它不能被用于balancingdispatcher。

将singleconsumeronlyunboundedmailbox配置为默认邮箱:

那些配置会传给mailbox类型

每一个邮箱类型都继承自mailboxtype,它的构造函数有两个参数:actorsystem.settings对象和config对象。后面这个是通过actor系统的配置获取,用邮箱类型的配置路径覆盖它的id关键字,并添加一个默认邮箱配置的回调。

内置的mailbox实现

akka自带了很多邮箱实现:

这个队列可能会或可能不会比默认邮箱更快,取决于你的使用场景—请确保进行过适当的基准测试!

其它的有界邮箱实现如果达到最大容量,并且配置了non-zeromailbox-push-timeout-time,会阻塞发送者。

注意

下面的邮箱只应该用于mailbox-push-timeout-time为0的情况。

如果创建prioritymailbox:

importcom.typesafe.config.config;

importakka.actor.actorsystem;

importakka.actor.poisonpill;

importakka.dispatch.prioritygenerator;

importakka.dispatch.unboundedprioritymailbox;

publicclassmypriomailboxextendsunboundedprioritymailbox{

//用于反射实例化

publicmypriomailbox(actorsystem.settingssettings,configconfig){

//创建一个新的prioritygenerator,低优先级意味着更重要

super(newprioritygenerator(){

@override

publicintgen(objectmessage){

if(message.equals("highpriority"))

return0;//如果可能的话,高优先级的消息应该被优先处理

elseif(message.equals("lowpriority"))

return2;//如果低优先级的消息应该被最后处理

elseif(message.equals(poisonpill.getinstance()))

return3;//当没有剩余时,则为处理poisonpill

else

return1;//默认位于高优先级和低优先级

}

});

}

}

然后把它添加到配置中:

prio-dispatcher{

mailbox-type="docs.dispatcher.dispatcherdocspec$mypriomailbox"

//otherdispatcherconfigurationgoeshere

}

下面是使用这个邮箱的例子:

importcom.typesafe.config.config;

importcom.typesafe.config.configfactory;

importakka.actor.actorref;

importakka.actor.actorsystem;

importakka.actor.poisonpill;

importakka.actor.props;

importakka.actor.untypedactor;

importakka.event.logging;

importakka.event.loggingadapter;

publicclassdemoextendsuntypedactor{

loggingadapterlog=logging.getlogger(getcontext().system(),this);

{

for(objectmsg:newobject[]{"lowpriority","lowpriority","highpriority","pigdog","pigdog2","pigdog3",

"highpriority",poisonpill.getinstance()}){

getself().tell(msg,getself());

}

}

publicvoidonreceive(objectmessage){

log.info(message.tostring());

}

publicstaticvoidmain(string[]args){

configconfig=configfactory.parsestring("akka.loglevel=debugn"+"akka.actor.debug.lifecycle=on");

//wecreateanewactorthatjustprintsoutwhatitprocesses

actorsystemsystem=actorsystem.create("mailbox");

actorrefmyactor=system.actorof(props.create(demo.class).withdispatcher("prio-dispatcher"),"demo");

system.terminate();

}

}

运行输出:

[info][12/24/201623:38:22.364][mailbox-prio-dispatcher-5][akka://mailbox/user/demo]highpriority

[info][12/24/201623:38:22.364][mailbox-prio-dispatcher-5][akka://mailbox/user/demo]highpriority

[info][12/24/201623:38:22.364][mailbox-prio-dispatcher-5][akka://mailbox/user/demo]pigdog

[info][12/24/201623:38:22.364][mailbox-prio-dispatcher-5][akka://mailbox/user/demo]pigdog2

[info][12/24/201623:38:22.364][mailbox-prio-dispatcher-5][akka://mailbox/user/demo]pigdog3

[info][12/24/201623:38:22.365][mailbox-prio-dispatcher-5][akka://mailbox/user/demo]lowpriority

[info][12/24/201623:38:22.365][mailbox-prio-dispatcher-5][akka://mailbox/user/demo]lowpriority

也可以直接配置邮箱类型,就像这样:

prio-mailbox{

mailbox-type="docs.dispatcher.dispatcherdocspec$mypriomailbox"

//othermailboxconfigurationgoeshere

}

akka.actor.deployment{

/priomailboxactor{

mailbox=prio-mailbox

}

}

然后就可以使用来自部署的邮箱类型:

actorrefmyactor=

system.actorof(props.create(myuntypedactor.class),

"priomailboxactor");

或这样:

actorrefmyactor=

system.actorof(props.create(myuntypedactor.class)

.withmailbox("prio-mailbox"));

一个值得上千次吹嘘的例子:

importakka.actor.actorref;

importakka.actor.actorsystem;

importakka.dispatch.envelope;

importakka.dispatch.mailboxtype;

importakka.dispatch.messagequeue;

importakka.dispatch.producesmessagequeue;

importcom.typesafe.config.config;

importjava.util.concurrent.concurrentlinkedqueue;

importjava.util.queue;

importscala.option;

publicclassmyunboundedjmailboximplementsmailboxtype,producesmessagequeue{

//thisisthemessagequeueimplementation

publicstaticclassmymessagequeueimplementsmessagequeue,myunboundedjmessagequeuesemantics{

privatefinalqueuequeue=newconcurrentlinkedqueue();

//thesemustbeimplemented;queueusedasexample

publicvoidenqueue(actorrefreceiver,envelopehandle){

queue.offer(handle);

}

publicenvelopedequeue(){

returnqueue.poll();

}

publicintnumberofmessages(){

returnqueue.size();

}

publicbooleanhasmessages(){

return!queue.isempty();

}

publicvoidcleanup(actorrefowner,messagequeuedeadletters){

for(envelopehandle:queue){

deadletters.enqueue(owner,handle);

}

}

}

//thisconstructorsignaturemustexist,itwillbecalledbyakka

publicmyunboundedjmailbox(actorsystem.settingssettings,configconfig){

//putyourinitializationcodehere

}

//thecreatemethodiscalledtocreatethemessagequeue

publicmessagequeuecreate(optionowner,optionsystem){

returnnewmymessagequeue();

}

}

//markerinterfaceusedformailboxrequirementsmapping

publicinterfacemyunboundedjmessagequeuesemantics{

}

然后只需要将分发器配置或mailbox配置的"mailbox-type"的值指定为你的mailboxtype的全限定名。

注意

确保包含一个参数为akka.actor.actorsystem.settings和com.typesafe.config.configmake的构造函数,因为这个构造函数会被反射调用,以便构建你的邮箱类型。传入的第二参数config是来自于配置中描述使用了这个邮箱类型的分发器和邮箱设置的那部分。对于每一个分发器和邮箱,邮箱类型只实例化一次。

你也可以使用邮箱类型作为分发器的必要条件,就像这样:

custom-dispatcher{

mailbox-requirement=

"docs.dispatcher.myunboundedjmessagequeuesemantics"

}

akka.actor.mailbox.requirements{

"docs.dispatcher.myunboundedjmessagequeuesemantics"=

custom-dispatcher-mailbox

}

custom-dispatcher-mailbox{

mailbox-type="docs.dispatcher.myunboundedjmailbox"

}

或者在你的actor上定义必备条件,就像这样:

importakka.actor.untypedactor;

importakka.dispatch.requiresmessagequeue;

publicclassmyspecialactorextendsuntypedactorimplementsrequiresmessagequeue{

@override

publicvoidonreceive(objectarg0)throwsexception{

//todoauto-generatedmethodstub

}

}

为了让system.actorof既同步又是非阻塞的,同时保持返回类型actorref(和返回的引用功能齐全的语义),这种情况下需要特殊处理。在这些场景背后,构造了虚拟的actor引用,这些引用被发送到系统的守护actor,守护actor实际创建actor和它的上下文,并将它们放入引用内部。untilthathashappened,发送到actorref的消息将被本地排队,一旦交换实际的填写,它们将被转移到真正的邮箱。因此,

finalpropsprops=...

//这个actor使用mycustommailbox,假设它是单例

system.actorof(props.withdispatcher("mycustommailbox").tell("bang",sender);

assert(mycustommailbox.getinstance().getlastenqueued().equals("bang"));

可能会失败;你将不得不允许一些时间来匆忙地传入检查并重试。testkit.awaitcond。

相关文章
{{ v.title }}
{{ v.description||(cleanHtml(v.content)).substr(0,100)+'···' }}
你可能感兴趣
推荐阅读 更多>
推荐商标

{{ v.name }}

{{ v.cls }}类

立即购买 联系客服