假设有一个很耗时的运算,单台机器已经没法满足需求,这时你可以想到由多台计算机协作完成。具体怎么做呢。
举个很简单的例子,假设这个耗时的运算是从1加到100000,你现在有两台服务器,可以让这两台服务器分别完成从1加到50000,和从50001加到100000,然后本机完成这两个结果之和。
两台服务器分别启动两个akka Server,同时还有一个CalcActor。这个计算actor接收两个参数:Integer start和Integer end,可以从start一直加到end,最后将结果返回给发送者:getSender().tell(result)。
@Log4j class CalcActor extends UntypedActor { @Override void onReceive(Object message) { log.debug "CalcActor received: ${message}----self:${getSelf()},sender:${getSender()}" if (message instanceof String) { String[] args = message.split(",") int start = Integer.parseInt(args[0]) int end = Integer.parseInt(args[1]) double result = 0d println("start calc:" + start + " upto " + end) start.upto(end) { result += it } sleep(5000) //模拟还要额外耗时5秒 println("result:" + result) getSender().tell(result) } else { unhandled(message) } } } 两个服务器分别为:
AkkaServerApp serverA = new AkkaServerApp("sc", "10.68.3.122", 8888, "calc") //AkkaSystemName为sc,ip为10.68.3.122,端口为8888,serviceName为calc。 AkkaServerApp serverA = new AkkaServerApp("sp", "10.68.3.124", 8888, "calc")//AkkaSystemName为sp,ip为10.68.3.124,端口为8888,serviceName为calc。
主要的代码在客户端:
public static void main(String[] args) throws Exception { final AkkaServerApp app = new AkkaServerApp("xwc", "127.0.0.1", 6666, "client");//客户端akka配置 ActorRef remoteCalcA1 = app.getSystem().actorOf(new Props(CalcActor.class)..withDeploy(new Deploy(new RemoteScope(new Address("akka", "sc", "10.68.3.122", 8888)))), "clientCalcA1");//将CalcActor发布到远程10.68.3.122上 ActorRef remoteCalcA2 = app.getSystem().actorOf(new Props(CalcActor.class)..withDeploy(new Deploy(new RemoteScope(new Address("akka", "sc", "10.68.3.122", 8888)))), "clientCalcA2");//将CalcActor发布到远程10.68.3.124上 final List<Future<Double>> frs = new ArrayList<Future<Double>>();//异步返回结果Future存放在list中 //tell只请求,是否响应它完全不知道。ask是请求,并明确知道未来会相应。 // remoteCalcA.tell("1,10000", app.getServerActor()); // remoteCalcB.tell("10001,20000", app.getServerActor()); Future f1 = akka.pattern.Patterns.ask(remoteCalcA1, "1,50000", 150000);//让远程122计算从1加到50000,超时时间为150秒 Future f2 = akka.pattern.Patterns.ask(remoteCalcA1, "50001,100000", 150000);//并发地让远程124计算从50001加到100000,超时时间150秒 frs.add(f1); frs.add(f2); Future<Iterable<Double>> future = Futures.sequence(frs, app.getSystem().dispatcher());将未来返回的结果转换成Future<Iterable<Double>> Future<Double> fr = future.map(new Mapper<Iterable<Double>, Double>() { @Override public Double apply(Iterable<Double> parameter) { Double result = 0d; for (Double s : parameter) {//计算两个服务器返回的结果 result += s; } return result; } }); fr.onSuccess(new OnSuccess<Double>() { @Override public void onSuccess(Double result) { System.out.println("云计算返回结果-----" + result); } }); }
还可以让服务器并发处理:把给从1加到50000的任务分成5个线程并行处理:1..10000,10001..20000,20001..30000,30001..40000,40001..50000,这样能更好地提高效率。
如果按上面的方法仅仅是发布多个remote actor:
ActorRef remoteCalcAn = app.getSystem().actorOf(new Props(CalcActor.class)..withDeploy(new Deploy(new RemoteScope(new Address("akka", "sc", "10.68.3.122", 8888)))), "clientCalcAn");
是没法提高效率的,因为这时的CalcActor是单线程的,它只会先接收1..10000,处理完后再接收10001..20000并处理。。。。。
使其能够并行处理很简单,创建remoteActor时加上withRoute即可:
ActorRef remoteCalcAn = app.getSystem().actorOf(new Props(CalcActor.class).withRouter(new RoundRobinRouter(5)).withDeploy(new Deploy(new RemoteScope(new Address("akka", "sc", "10.68.3.122", 8888)))), "clientCalcAn"); //RoundRobinRouter的参数5可以理解为分配5个线程并行处理
代码跟上面基本相同
public static void main(String[] args) throws Exception { final AkkaServerApp app = new AkkaServerApp("xwc", "127.0.0.1", 6666, "client"); ActorRef remoteCalcA1 = app.getSystem().actorOf(new Props(CalcActor.class).withRouter(new RoundRobinRouter(4)).withDeploy(new Deploy(new RemoteScope(new Address("akka", "sc", "10.68.3.122", 8888)))), "clientCalcA1"); ActorRef remoteCalcB1 = app.getSystem().actorOf(new Props(CalcActor.class).withRouter(new RoundRobinRouter(4)).withDeploy(new Deploy(new RemoteScope(new Address("akka", "sp", "10.68.3.124", 8888)))), "clientCalcB1"); final List<Future<Double>> frs = new ArrayList<Future<Double>>(); Future f1 = akka.pattern.Patterns.ask(remoteCalcA1, "1,10000", 150000); Future f2 = akka.pattern.Patterns.ask(remoteCalcA1, "10001,20000", 150000); Future f3 = akka.pattern.Patterns.ask(remoteCalcA1, "20001,30000", 150000); Future f4 = akka.pattern.Patterns.ask(remoteCalcA1, "30001,40000", 150000); Future f5 = akka.pattern.Patterns.ask(remoteCalcB1, "40001,50000", 150000); Future f6 = akka.pattern.Patterns.ask(remoteCalcB1, "50001,60000", 150000); Future f7 = akka.pattern.Patterns.ask(remoteCalcB1, "60001,70000", 150000); Future f8 = akka.pattern.Patterns.ask(remoteCalcB1, "70001,80000", 150000); frs.add(f1); frs.add(f2); frs.add(f3); frs.add(f4); frs.add(f5); frs.add(f6); frs.add(f7); frs.add(f8); Future<Iterable<Double>> future = Futures.sequence(frs, app.getSystem().dispatcher()); Future<Double> fr = future.map(new Mapper<Iterable<Double>, Double>() { @Override public Double apply(Iterable<Double> parameter) { Double result = 0d; for (Double s : parameter) { result += s; } return result; } }); fr.onSuccess(new OnSuccess<Double>() { @Override public void onSuccess(Double result) { System.out.println("云计算返回从1加到80000的结果-----" + result); } }); } http://m.oschina.net/blog/81118
相关推荐
Akka is a distributed computing toolkit that enables developers to build correct concurrent and distributed applications using Java and Scala with ease, applications that scale across servers and ...
Akka in Action shows you how to build message-oriented systems with Akka. This comprehensive, hands-on tutorial introduces each concept with a working example. You’ll start with the big picture of ...
FIABTurntable:使用Guice + Akka C2库(C2Akka)的FIABTurntable实现
Akka Essentials,学习akka很好的一本书
akka-kryo-serialization, 基于Kryo的Akka序列化 akka-kryo-serialization-- Scala 和Akka基于kryo的序列化程序这个库为 Scala 和Akka提供定制的基于kryo的序列化程序。 它可以用于更高效的akka远程处理。它还可以...
第 2 章 Actor 与并发:响应式编程。Actor 与 Future 的使用。 第 3 章 传递消息:消息传递模式。 第 4 章 Actor 的生命周期—处理状态与错误:Actor 生命周期、监督机制、Stash/ Unstash、Become/Unbecome 以及有限...
akka 实战。akka in action。v13 2014新版。 互联网技术入门必备 清晰,非扫描。
akka_2.10
akka实例 java实现tcp远程调用,一个服务端,一个客户端
赠送jar包:akka-actor_2.11-2.5.19.jar; 赠送原API文档:akka-actor_2.11-2.5.19-javadoc.jar; 赠送源代码:akka-actor_2.11-2.5.19-sources.jar; 赠送Maven依赖信息文件:akka-actor_2.11-2.5.19.pom; 包含...
第 2 章 Actor 与并发:响应式编程。Actor 与 Future 的使用。 第 3 章 传递消息:消息传递模式。 第 4 章 Actor 的生命周期—处理状态与错误:Actor 生命周期、监督机制、Stash/ Unstash、Become/Unbecome 以及有限...
Learning Akka Learning Akka Learning AkkaLearning Akka
In March 2010 I noticed a tweet by Dean Wampler that made me look into Akka: W00t! RT @jboner: #akka 0.7 is released: http://bit.ly/9yRGSB After some investigation into the source code and building a ...
Akka is Open Source and available under the Apache 2 License. Download from http://akka.io/downloads. Please note that all code samples compile, so if you want direct access to the sources, have a ...
初学akka使用实例,有很好的帮助啊,可实际运行
akka-data-replication, 在Akka集群中,复制 CRDTs Akka分布式数据这个库的( akka-data-replication ) 已经包含在Akka中,在模块中分发数据。英镑不在/akka-data-replication中维护。 所有 Bug 修复和新功能将在 ...
另外,本书介绍了 Actor 模型的一个实现框架 Akka 以及它的工具,而后讨论了在充分利用 actor 架构的基础上使用 Akka 框架来设计软件系统的方法,以及使用它来开发并发性和分布式应用程序的方怯。本书还介绍了领域 ...
Chapter 2. Introducing Akka Chapter 3. Distributed Domain-Driven Design Chapter 4. Good Actor Design Chapter 5. Good Data Flow Chapter 6. Consistency and Scalability Chapter 7. Fault Tolerance Chapter...
akka学习入门实践
赠送jar包:akka-stream_2.11-2.5.21.jar; 赠送原API文档:akka-stream_2.11-2.5.21-javadoc.jar; 赠送源代码:akka-stream_2.11-2.5.21-sources.jar; 赠送Maven依赖信息文件:akka-stream_2.11-2.5.21.pom; ...