码迷,mamicode.com
首页 > 其他好文 > 详细

AWS学习笔记(七)--集成SQS和Lambda

时间:2018-02-23 13:29:51      阅读:499      评论:0      收藏:0      [点我收藏+]

标签:led   services   oid   方式   cloud   reg   mod   dea   处理   

本文介绍了集成SQS和Lambda的方法,代码基于JAVA SDK。

POM配置

<dependencies>  
    <dependency>  
        <artifactId>aws-java-sdk-sqs</artifactId>  
        <groupId>com.amazonaws</groupId>  
    </dependency>  
    <dependency>  
        <groupId>com.amazonaws</groupId>  
        <artifactId>aws-lambda-java-core</artifactId>  
        <version>1.2.0</version>  
    </dependency>  
    <dependency>  
        <groupId>com.amazonaws</groupId>  
        <artifactId>aws-lambda-java-events</artifactId>  
        <version>2.0.2</version>  
    </dependency>  
    <dependency>  
        <groupId>com.amazonaws</groupId>  
        <artifactId>aws-lambda-java-log4j2</artifactId>  
        <version>1.1.0</version>  
    </dependency>  
</dependencies>  

<dependencyManagement>  
    <dependencies>  
        <dependency>  
            <groupId>com.amazonaws</groupId>  
            <artifactId>aws-java-sdk-bom</artifactId>  
            <version>1.11.272</version>  
            <type>pom</type>  
            <scope>import</scope>  
        </dependency>  
    </dependencies>  
</dependencyManagement>

SQS

中国区目前仅支持标准Queue,不支持FIFO Queue,下面代码以标准Queue为例,演示了创建Queue、配置Dead Letter Queue、发送Message、接收Message、删除Message、删除Queue的方法:

import com.amazonaws.regions.Regions;  
import com.amazonaws.services.sqs.AmazonSQS;  
import com.amazonaws.services.sqs.AmazonSQSClientBuilder;  
import com.amazonaws.services.sqs.model.*;  

import java.util.*;  

public class SqsUtil {  
    private static final String ARN_ATTRIBUTE_NAME = "QueueArn";  
    private static AmazonSQS sqs;  

    static {  
        sqs = AmazonSQSClientBuilder.standard().withRegion(Regions.CN_NORTH_1).build();  
    }  

    private SqsUtil() {  
    }  

    public static String createQueue(String queueName) {  
        System.out.println("Creating a new SQS queue called " + queueName);  

        CreateQueueRequest createQueueRequest = new CreateQueueRequest(queueName);  
        Map<String, String> attributes = new HashMap<>();  
        // 接收消息等待时间  
        attributes.put("ReceiveMessageWaitTimeSeconds", "5");  
        createQueueRequest.withAttributes(attributes);  

        return sqs.createQueue(createQueueRequest).getQueueUrl();  
    }  

    public static String createDeadLetterQueue(String queueName) {  
        String queueUrl = createQueue(queueName);  
        // 配置Dead Letter Queue时使用ARN  
        return getQueueArn(queueUrl);  
    }  

    public static void configDeadLetterQueue(String queueUrl, String deadLetterQueueArn) {  
        System.out.println("Config dead letter queue for " + queueUrl);  

        SetQueueAttributesRequest queueAttributes = new SetQueueAttributesRequest();
                Map<String, String> attributes = new HashMap<>();  
        // 最大接收次数设为5,当接收次数超过5后,消息未被处理和删除将被转到死信队列  
        attributes.put("RedrivePolicy", "{\"maxReceiveCount\":\"5\", \"deadLetterTargetArn\":\"" + deadLetterQueueArn + "\"}");  
        queueAttributes.setAttributes(attributes);  
        queueAttributes.setQueueUrl(queueUrl);  

        sqs.setQueueAttributes(queueAttributes);  
    }  

    public static void sendMessage(String queueUrl, String message) {  
        System.out.println("Sending a message to " + queueUrl);  

        SendMessageRequest request = new SendMessageRequest();  
        request.withQueueUrl(queueUrl);  
        request.withMessageBody(message);  
        Map<String, MessageAttributeValue> messageAttributes = new HashMap<>();  
        // 添加消息属性,注意必须要有DataType和Value  
        messageAttributes.put("Hello", new MessageAttributeValue().withDataType("String").withStringValue("COCO"));  
        request.withMessageAttributes(messageAttributes);  

        sqs.sendMessage(request);  
    }  

    public static void receiveMessages(String queueUrl) {  
        System.out.println("Receiving messages from " + queueUrl);  

        ReceiveMessageRequest receiveMessageRequest = new ReceiveMessageRequest(queueUrl);  
        receiveMessageRequest.setMaxNumberOfMessages(5);  
        receiveMessageRequest.withWaitTimeSeconds(10);  
        // 要添加MessageAttributeNames,否则不能接收  
        receiveMessageRequest.setMessageAttributeNames(Arrays.asList("Hello"));  

        List<Message> messages = sqs.receiveMessage(receiveMessageRequest).getMessages();  
        for (Message message : messages) {  
            System.out.println("Message: " + message.getBody());  
            for (Map.Entry<String, MessageAttributeValue> entry : message.getMessageAttributes().entrySet()) {  
                System.out.println("  Attribute");  
                System.out.println("    Name:  " + entry.getKey());  
                System.out.println("    Value: " + entry.getValue().getStringValue());  
            }

                        // Delete message  
            System.out.println("Deleting a message.");  
            String messageReceiptHandle = message.getReceiptHandle();  
            sqs.deleteMessage(new DeleteMessageRequest(queueUrl, messageReceiptHandle));  
        }  
    }  

    public static void deleteQueue(String queueUrl) {  
        System.out.println("Deleting the queue " + queueUrl);  
        sqs.deleteQueue(new DeleteQueueRequest(queueUrl));  
    }  

    public static String getQueueArn(String queueUrl) {  
        List<String> attributes = new ArrayList<>();  
        attributes.add(ARN_ATTRIBUTE_NAME);  
        GetQueueAttributesResult queueAttributes = sqs.getQueueAttributes(queueUrl, attributes);  
        return queueAttributes.getAttributes().get(ARN_ATTRIBUTE_NAME);  
    }  

}

在运行上面代码前,要在{HOME}/.aws目录下配置credentials,用户要有SQS权限:
[default]
aws_access_key_id = AAAAAAAAAAAAAA
aws_secret_access_key = MXXXXXXXXXXXXXXXXXXXXXX9

测试一下:

// 创建Dead Letter Queue  
String deadLetterQueueArn = createDeadLetterQueue("DeadLetterQueue");  
// 创建Task Queue  
String queueUrl = createQueue("TaskQueue");  
// 配置Dead Letter Queue  
configDeadLetterQueue(queueUrl, deadLetterQueueArn);  
// 发送Message  
for (int i = 0; i < 6; i++) {  
    sendMessage(queueUrl, "Hello COCO " + i);  
}  
// 接收Message  
receiveMessages(queueUrl);  
// 删除Queue  
deleteQueue(queueUrl);

Lambda

Function Code

Lambda函数定义支持两种方式 :

  • 实现预定义接口RequestStreamHandler 或 RequestHandler
    import com.amazonaws.services.lambda.runtime.RequestHandler;  
    import com.amazonaws.services.lambda.runtime.Context;
    public class Hello implements RequestHandler<Request, Response> {  
    // Request,Response为自定义的类型  
    public Response handleRequest(Request request, Context context) {  
        String greetingString = String.format("Hello %s %s.", request.firstName, request.lastName);  
        return new Response(greetingString);  
    }  
    }
  • 不实现任何接口,直接定义处理程序方法
    outputType handler-name(inputType input, Context context) {  
    ...  
    }

inputType 和 outputType 可为以下类型之一:

  • Java 基元类型(如 String 或 int)。
  • aws-lambda-java-events 库中的预定义 AWS 事件类型。 如S3Event。
  • 自己的 POJO 类。AWS Lambda 会根据该 POJO 类型自动序列化和反序列化输入、输出 JSON。

如不需要,可以省略处理程序方法签名中的 Context 对象。

先编写一个简单的测试用例接收SQS消息,输入参数input为Queue URL:

import com.amazonaws.services.lambda.runtime.Context;  
import com.amazonaws.services.lambda.runtime.LambdaLogger;  
import com.amazonaws.services.lambda.runtime.RequestHandler;  

public class Hello implements RequestHandler<String, String> {  
    @Override  
    public String handleRequest(String input, Context context) {  
        LambdaLogger logger = context.getLogger();  
        logger.log("received : " + input);  
        SqsUtil.receiveMessages(input);  
        return "success";  
    }  
}

程序编写完了,如何放入到Lambda函数中呢?需要打成jar包,且须包含依赖包,pom中增加shade插件:

<build>  
    <plugins>  
        <plugin>  
            <groupId>org.apache.maven.plugins</groupId>  
            <artifactId>maven-shade-plugin</artifactId>  
            <version>3.1.0</version>  
            <configuration>  
                <createDependencyReducedPom>false</createDependencyReducedPom>  
            </configuration>  
            <executions>  
                <execution>  
                    <phase>package</phase>  
                    <goals>  
                        <goal>shade</goal>  
                    </goals>  
                </execution>  
            </executions>  
        </plugin>  
    </plugins>  
</build>

创建Lambda Function

下面通过Web Console创建Lambda Function
技术分享图片
注意:role要有lambda、Cloudwatch Logs、SQS权限。

然后上传jar包,配置Handler
技术分享图片
再调整一下内存配置和超时参数,保存。
技术分享图片
配置测试参数,测试一下先:
技术分享图片
执行成功输出:
技术分享图片

Lambda触发器

下面修改一下代码,输入参数类型改为ScheduledEvent,将使用触发器CloudWatch Events调用。

import com.amazonaws.services.lambda.runtime.Context;  
import com.amazonaws.services.lambda.runtime.LambdaLogger;  
import com.amazonaws.services.lambda.runtime.RequestHandler;  
import com.amazonaws.services.lambda.runtime.events.ScheduledEvent;  

public class Hello implements RequestHandler<ScheduledEvent, String> {  
    @Override  
    public String handleRequest(ScheduledEvent input, Context context) {  
        LambdaLogger logger = context.getLogger();  
        logger.log("received : " + input.toString() + "\n");  
        SqsUtil.receiveMessages("https://sqs.cn-north-1.amazonaws.com.cn/891245999999/TaskQueue");  
        return "success";  
    }  
} 

上传后,同样先手工测试一下,这次选择模板Scheduled Event
技术分享图片
测试成功后,配置CloudWatch Events触发器,Rule Type选择Schedule expression:
技术分享图片
保存后就可以定时调用lambda了,O。

Integrate SQS and Lambda: serverless architecture for asynchronous workloads
Amazon Simple Queue Service Developer Guide
AWS Lambda Developer Guide
Programming Model for Authoring Lambda Functions in Java
AWS SDK for Java Developer Guide
Schedule Expressions Using Rate or Cron
AWS 视频中心
AWS微服务和无服务器架构入门
快速理解AWS Lambda,轻松构建Serverless后台
用无服务器应用模型构建AWS Lambda应用
如何通过运行无服务器来满足企业需求
无服务器架构设计模式和最佳实践

AWS学习笔记(七)--集成SQS和Lambda

标签:led   services   oid   方式   cloud   reg   mod   dea   处理   

原文地址:http://blog.51cto.com/7308310/2072301

(0)
(0)
   
举报
评论 一句话评论(0
登录后才能评论!
© 2014 mamicode.com 版权所有  联系我们:gaon5@hotmail.com
迷上了代码!