码迷,mamicode.com
首页 > Web开发 > 详细

webFlux 学习(二)

时间:2019-07-17 11:10:20      阅读:163      评论:0      收藏:0      [点我收藏+]

标签:不能   创建   unit   ons   sync   oda   div   stc   扩展   

webFlux

webFlux 是spring5提出的,一个非阻塞,运行在netty或者Servlet3.1之上,

MVC和webFlux 有什么关系呢?

技术图片

1.阻塞和非阻塞

webflux 是一个非阻塞的模式 可以在一个线程里可以处理更多的请求

传统的mvc是一个阻塞的开发模式 一个请求对应我们容器里的一个线程 

2.运行环境

mvc 是基于servlet api 所以必须运行servlet 容器上面

webflux是基于响应式流,它可以运行servlet 或者netty 上面

3.数据方面

关系数据暂时不能使用webflux

优势

1.支持高并发量 水平扩展/垂直扩展,我们可以使用webflux 进行垂直扩展.

servlet

idea 创建servlet https://blog.csdn.net/a376298333/article/details/79121548

为什么要使用异步servlet?

@WebServlet(urlPatterns = { "/AsyncServlet" },asyncSupported = true)
public class AyncServlet extends HttpServlet {
    protected void doPost(javax.servlet.http.HttpServletRequest request, javax.servlet.http.HttpServletResponse response) throws javax.servlet.ServletException, IOException {

    }

    protected void doGet(javax.servlet.http.HttpServletRequest request, javax.servlet.http.HttpServletResponse response) throws javax.servlet.ServletException, IOException {
        long start = System.currentTimeMillis();
        //1.开启异步
        AsyncContext asyncContext = request.startAsync();
        //2.执行耗时操作
        CompletableFuture.runAsync(()->{
            something(asyncContext,asyncContext.getRequest(),asyncContext.getResponse());
        });


        System.out.println("aynctime use: "+ (System.currentTimeMillis()-start));
    }

    private void something(AsyncContext asyncContext, ServletRequest request, ServletResponse response)  {
        //模拟耗时操作
        try {
            TimeUnit.SECONDS.sleep(5);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }

        try {
            response.getWriter().append("aync done");
        } catch (IOException e) {
            e.printStackTrace();
        }
        //业务代码处理完.我们要通知他
        asyncContext.complete();
    }
}

aynctime use: 11

这是一个异步的servlet http://localhost:8080/AsyncServlet 访问在前台来看还是5秒钟

但是后台的时间是11ms 不会阻塞tomcat线程 可以把一些耗时的操作放在独立线程池里,我们的serlet线程就可以处理下一个线程达到比较高的吞吐量

同步servlet阻塞了什么?

/**
 * @Created by xiaodao
 */
@WebServlet("/SyncServlet")
public class SyncServlet extends javax.servlet.http.HttpServlet {
    protected void doPost(javax.servlet.http.HttpServletRequest request, javax.servlet.http.HttpServletResponse response) throws javax.servlet.ServletException, IOException {

    }

    protected void doGet(javax.servlet.http.HttpServletRequest request, javax.servlet.http.HttpServletResponse response) throws javax.servlet.ServletException, IOException {
        long start = System.currentTimeMillis();
        System.out.println(start);
        something(request,response)
        ;

        System.out.println("time use: "+ (System.currentTimeMillis()-start));
    }

    private void something(HttpServletRequest request, HttpServletResponse response) throws IOException {
        //模拟耗时操作
        try {
            TimeUnit.SECONDS.sleep(5);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }

        response.getWriter().append("done");
    }
}

time use :5001

在浏览器中访问可以看到我们返回需要5m中.这时候我们就知道

同步的servlet阻塞了tomcat容器的servlet线程,

当我们的网络请求->tomcat容器->为每一个请求启动一个线程去处理->线程里找一个servlet线程来处理

异步servlet怎么工作的?

1.我们开启异步支持

2.把页面代码放到独立的线程池执行

3.调用异步上下文的comlate方法通知他结束

webFlux 入门

什么是reactor编程呢?就是jdk8 stream + jdk11 reactive stream 

mono: 0-1个元素

flux:0-N个元素

我们来一个示例程序看下:

/**
 * @Created by xiaodao
 */
public class Main {

    public static void main(String[] args) {
        String[] arr = {"1","2","3","4"};
        Subscriber<Integer> subscriber = new Subscriber<Integer>() {
            private Subscription subscription;
            @Override
            public void onSubscribe(Subscription subscription) {
                this.subscription =subscription;
                this.subscription.request(1);
            }

            @Override
            public void onNext(Integer integer) {
                System.out.println(integer);
                this.subscription.request(1);
            }

            @Override
            public void onError(Throwable throwable) {

            }

            @Override
            public void onComplete() {

            }
        };
        //jdk8 stream 这个时候并没有执行
        Flux.fromArray(arr).map(s->Integer.parseInt(s))
                .subscribe(subscriber);//jdk9 的 reactive stream
    }
}

jdk8 的流可以看做是一个发布者

jdk9的subscribe可以看做是一个订阅者.

来看下webflux是如何执行的?

@RestController
@Slf4j
public class TestController {


    @GetMapping("/test1")
    public String test1(){
        log.info("start");
        String str  = createStr();
        log.info("end");
        return  str;
    }

    @GetMapping("/test2")
    public Mono<String> test2(){
        log.info("mono start");
        Mono<String> stringMono = Mono.fromSupplier(() -> createStr());
        log.info("mono end ");
        return stringMono;
    }


    public String createStr(){
        try {

            TimeUnit.SECONDS.sleep(5);


        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        return "hello web flux ";
    }
}

上面的代码是一个简单的操作我们模拟业务需求让代码耗时5秒钟的时间

在浏览器中分别访问test1 test2 在浏览器中都是等待5秒钟,这点都都一样,对于浏览器来说不存在同步和异步之说,阻塞和非阻塞,同步与异步只存在于服务端

接下来我们来看来在spring中是如何执行的.

技术图片

 

我们可以看到当我们访问test1 的时候test1 controller方法就占用了5秒钟的时间

第二种模式,controller是基本没有耗时的,我们新模式中返回的mono实际上是返回了一个流,当调用subscibe()方法的时候才会执行 

flux

   @GetMapping(value = "/test3",produces = MediaType.TEXT_EVENT_STREAM_VALUE)
    public Flux<String> test3(){
        log.info("mono start");

        Flux<String> stringMono = Flux.fromStream(IntStream.range(1,5).mapToObj(i->{
            try {
                TimeUnit.SECONDS.sleep(1);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            return "flux data "+i+"/n";
        }));
        return stringMono;
    }

 

webFlux 学习(二)

标签:不能   创建   unit   ons   sync   oda   div   stc   扩展   

原文地址:https://www.cnblogs.com/bj-xiaodao/p/11046716.html

(0)
(0)
   
举报
评论 一句话评论(0
登录后才能评论!
© 2014 mamicode.com 版权所有  联系我们:gaon5@hotmail.com
迷上了代码!