`
m635674608
  • 浏览: 4932154 次
  • 性别: Icon_minigender_1
  • 来自: 南京
社区版块
存档分类
最新评论

Akka2

    博客分类:
  • java
 
阅读更多

假设有一个很耗时的运算,单台机器已经没法满足需求,这时你可以想到由多台计算机协作完成。具体怎么做呢。

举个很简单的例子,假设这个耗时的运算是从1加到100000,你现在有两台服务器,可以让这两台服务器分别完成从1加到50000,和从50001加到100000,然后本机完成这两个结果之和。

 

两台服务器分别启动两个akka Server,同时还有一个CalcActor。这个计算actor接收两个参数:Integer start和Integer end,可以从start一直加到end,最后将结果返回给发送者:getSender().tell(result)。

@Log4j
class CalcActor extends UntypedActor {
    @Override
    void onReceive(Object message) {
        log.debug "CalcActor received: ${message}----self:${getSelf()},sender:${getSender()}"
        if (message instanceof String) {
            String[] args = message.split(",")
            int start = Integer.parseInt(args[0])
            int end = Integer.parseInt(args[1])
            double result = 0d
            println("start calc:" + start + " upto " + end)
            start.upto(end) {
                result += it
            }
            sleep(5000) //模拟还要额外耗时5秒
            println("result:" + result)
            getSender().tell(result)
        } else {
            unhandled(message)
        }
    }
}
两个服务器分别为:
AkkaServerApp serverA = new AkkaServerApp("sc", "10.68.3.122", 8888, "calc") //AkkaSystemName为sc,ip为10.68.3.122,端口为8888,serviceName为calc。

    AkkaServerApp serverA = new AkkaServerApp("sp", "10.68.3.124", 8888, "calc")//AkkaSystemName为sp,ip为10.68.3.124,端口为8888,serviceName为calc。

主要的代码在客户端:
    public static void main(String[] args) throws Exception {

        final AkkaServerApp app = new AkkaServerApp("xwc", "127.0.0.1", 6666, "client");//客户端akka配置
        ActorRef remoteCalcA1 = app.getSystem().actorOf(new Props(CalcActor.class)..withDeploy(new Deploy(new RemoteScope(new Address("akka", "sc", "10.68.3.122", 8888)))), "clientCalcA1");//将CalcActor发布到远程10.68.3.122上
        ActorRef remoteCalcA2 = app.getSystem().actorOf(new Props(CalcActor.class)..withDeploy(new Deploy(new RemoteScope(new Address("akka", "sc", "10.68.3.122", 8888)))), "clientCalcA2");//将CalcActor发布到远程10.68.3.124上

        final List<Future<Double>> frs = new ArrayList<Future<Double>>();//异步返回结果Future存放在list中
//tell只请求,是否响应它完全不知道。ask是请求,并明确知道未来会相应。        
//        remoteCalcA.tell("1,10000", app.getServerActor());
//        remoteCalcB.tell("10001,20000", app.getServerActor());
        Future f1 = akka.pattern.Patterns.ask(remoteCalcA1, "1,50000", 150000);//让远程122计算从1加到50000,超时时间为150秒
        Future f2 = akka.pattern.Patterns.ask(remoteCalcA1, "50001,100000", 150000);//并发地让远程124计算从50001加到100000,超时时间150秒
        frs.add(f1);
        frs.add(f2);

        Future<Iterable<Double>> future = Futures.sequence(frs, app.getSystem().dispatcher());将未来返回的结果转换成Future<Iterable<Double>>
        Future<Double> fr = future.map(new Mapper<Iterable<Double>, Double>() {

            @Override
            public Double apply(Iterable<Double> parameter) {
                Double result = 0d;
                for (Double s : parameter) {//计算两个服务器返回的结果
                    result += s;
                }
                return result;
            }

        });

        fr.onSuccess(new OnSuccess<Double>() {
            @Override
            public void onSuccess(Double result) {
                System.out.println("云计算返回结果-----" + result);
            }
        });
    }

 

还可以让服务器并发处理:把给从1加到50000的任务分成5个线程并行处理:1..10000,10001..20000,20001..30000,30001..40000,40001..50000,这样能更好地提高效率。

如果按上面的方法仅仅是发布多个remote actor:

ActorRef remoteCalcAn = app.getSystem().actorOf(new Props(CalcActor.class)..withDeploy(new Deploy(new RemoteScope(new Address("akka", "sc", "10.68.3.122", 8888)))), "clientCalcAn");

是没法提高效率的,因为这时的CalcActor是单线程的,它只会先接收1..10000,处理完后再接收10001..20000并处理。。。。。

使其能够并行处理很简单,创建remoteActor时加上withRoute即可:

ActorRef remoteCalcAn = app.getSystem().actorOf(new Props(CalcActor.class).withRouter(new RoundRobinRouter(5)).withDeploy(new Deploy(new RemoteScope(new Address("akka", "sc", "10.68.3.122", 8888)))), "clientCalcAn");  //RoundRobinRouter的参数5可以理解为分配5个线程并行处理


代码跟上面基本相同 

    public static void main(String[] args) throws Exception {

        final AkkaServerApp app = new AkkaServerApp("xwc", "127.0.0.1", 6666, "client");
        ActorRef remoteCalcA1 = app.getSystem().actorOf(new Props(CalcActor.class).withRouter(new RoundRobinRouter(4)).withDeploy(new Deploy(new RemoteScope(new Address("akka", "sc", "10.68.3.122", 8888)))), "clientCalcA1");    
        ActorRef remoteCalcB1 = app.getSystem().actorOf(new Props(CalcActor.class).withRouter(new RoundRobinRouter(4)).withDeploy(new Deploy(new RemoteScope(new Address("akka", "sp", "10.68.3.124", 8888)))), "clientCalcB1");
   
        final List<Future<Double>> frs = new ArrayList<Future<Double>>();
        
        Future f1 = akka.pattern.Patterns.ask(remoteCalcA1, "1,10000", 150000);
        Future f2 = akka.pattern.Patterns.ask(remoteCalcA1, "10001,20000", 150000);
        Future f3 = akka.pattern.Patterns.ask(remoteCalcA1, "20001,30000", 150000);
        Future f4 = akka.pattern.Patterns.ask(remoteCalcA1, "30001,40000", 150000);
        Future f5 = akka.pattern.Patterns.ask(remoteCalcB1, "40001,50000", 150000);
        Future f6 = akka.pattern.Patterns.ask(remoteCalcB1, "50001,60000", 150000);
        Future f7 = akka.pattern.Patterns.ask(remoteCalcB1, "60001,70000", 150000);
        Future f8 = akka.pattern.Patterns.ask(remoteCalcB1, "70001,80000", 150000);
        frs.add(f1);
        frs.add(f2);
        frs.add(f3);
        frs.add(f4);
        frs.add(f5);
        frs.add(f6);
        frs.add(f7);
        frs.add(f8);

        Future<Iterable<Double>> future = Futures.sequence(frs, app.getSystem().dispatcher());
        Future<Double> fr = future.map(new Mapper<Iterable<Double>, Double>() {

            @Override
            public Double apply(Iterable<Double> parameter) {
                Double result = 0d;
                for (Double s : parameter) {
                    result += s;
                }
                return result;
            }

        });

        fr.onSuccess(new OnSuccess<Double>() {
            @Override
            public void onSuccess(Double result) {
                System.out.println("云计算返回从1加到80000的结果-----" + result);
            }
        });
    }


http://m.oschina.net/blog/81118
分享到:
评论

相关推荐

    Learning Akka(PACKT,2015)

    Akka is a distributed computing toolkit that enables developers to build correct concurrent and distributed applications using Java and Scala with ease, applications that scale across servers and ...

    Akka.in.Action.2016.9.pdf

    Akka in Action shows you how to build message-oriented systems with Akka. This comprehensive, hands-on tutorial introduces each concept with a working example. You’ll start with the big picture of ...

    FIABTurn​​table:使用Guice + Akka C2库(C2Akka)的FIABTurn​​table实现

    FIABTurn​​table:使用Guice + Akka C2库(C2Akka)的FIABTurn​​table实现

    AKKA 本质 《Akka Essentials》

    Akka Essentials,学习akka很好的一本书

    akka-kryo-serialization, 基于Kryo的Akka序列化.zip

    akka-kryo-serialization, 基于Kryo的Akka序列化 akka-kryo-serialization-- Scala 和Akka基于kryo的序列化程序这个库为 Scala 和Akka提供定制的基于kryo的序列化程序。 它可以用于更高效的akka远程处理。它还可以...

    Akka 基础学习pdf中文文档

    第 2 章 Actor 与并发:响应式编程。Actor 与 Future 的使用。 第 3 章 传递消息:消息传递模式。 第 4 章 Actor 的生命周期—处理状态与错误:Actor 生命周期、监督机制、Stash/ Unstash、Become/Unbecome 以及有限...

    Akka 实战 akka in action v13 2014版本

    akka 实战。akka in action。v13 2014新版。 互联网技术入门必备 清晰,非扫描。

    akka_2.10-2.314

    akka_2.10

    akka java实现tcp远程调用

    akka实例 java实现tcp远程调用,一个服务端,一个客户端

    akka-actor-2.11-2.5.19-API文档-中文版.zip

    赠送jar包:akka-actor_2.11-2.5.19.jar; 赠送原API文档:akka-actor_2.11-2.5.19-javadoc.jar; 赠送源代码:akka-actor_2.11-2.5.19-sources.jar; 赠送Maven依赖信息文件:akka-actor_2.11-2.5.19.pom; 包含...

    Akka入门与实践

    第 2 章 Actor 与并发:响应式编程。Actor 与 Future 的使用。 第 3 章 传递消息:消息传递模式。 第 4 章 Actor 的生命周期—处理状态与错误:Actor 生命周期、监督机制、Stash/ Unstash、Become/Unbecome 以及有限...

    Learning Akka

    Learning Akka Learning Akka Learning AkkaLearning Akka

    Akka-in-Action.pdf

    In March 2010 I noticed a tweet by Dean Wampler that made me look into Akka: W00t! RT @jboner: #akka 0.7 is released: http://bit.ly/9yRGSB After some investigation into the source code and building a ...

    akkajava.pdf

    Akka is Open Source and available under the Apache 2 License. Download from http://akka.io/downloads. Please note that all code samples compile, so if you want direct access to the sources, have a ...

    akka实例参考

    初学akka使用实例,有很好的帮助啊,可实际运行

    akka-data-replication, 在Akka集群中,复制 CRDTs.zip

    akka-data-replication, 在Akka集群中,复制 CRDTs Akka分布式数据这个库的( akka-data-replication ) 已经包含在Akka中,在模块中分发数据。英镑不在/akka-data-replication中维护。 所有 Bug 修复和新功能将在 ...

    Akka应用模式-分布式应用程序设计实践指南.pdf

    另外,本书介绍了 Actor 模型的一个实现框架 Akka 以及它的工具,而后讨论了在充分利用 actor 架构的基础上使用 Akka 框架来设计软件系统的方法,以及使用它来开发并发性和分布式应用程序的方怯。本书还介绍了领域 ...

    Applied.Akka.Patterns

    Chapter 2. Introducing Akka Chapter 3. Distributed Domain-Driven Design Chapter 4. Good Actor Design Chapter 5. Good Data Flow Chapter 6. Consistency and Scalability Chapter 7. Fault Tolerance Chapter...

    akka学习入门实践

    akka学习入门实践

    akka-stream_2.11-2.5.21-API文档-中英对照版.zip

    赠送jar包:akka-stream_2.11-2.5.21.jar; 赠送原API文档:akka-stream_2.11-2.5.21-javadoc.jar; 赠送源代码:akka-stream_2.11-2.5.21-sources.jar; 赠送Maven依赖信息文件:akka-stream_2.11-2.5.21.pom; ...

Global site tag (gtag.js) - Google Analytics