标签:led services oid 方式 cloud reg mod dea 处理
本文介绍了集成SQS和Lambda的方法,代码基于JAVA SDK。<dependencies>
<dependency>
<artifactId>aws-java-sdk-sqs</artifactId>
<groupId>com.amazonaws</groupId>
</dependency>
<dependency>
<groupId>com.amazonaws</groupId>
<artifactId>aws-lambda-java-core</artifactId>
<version>1.2.0</version>
</dependency>
<dependency>
<groupId>com.amazonaws</groupId>
<artifactId>aws-lambda-java-events</artifactId>
<version>2.0.2</version>
</dependency>
<dependency>
<groupId>com.amazonaws</groupId>
<artifactId>aws-lambda-java-log4j2</artifactId>
<version>1.1.0</version>
</dependency>
</dependencies>
<dependencyManagement>
<dependencies>
<dependency>
<groupId>com.amazonaws</groupId>
<artifactId>aws-java-sdk-bom</artifactId>
<version>1.11.272</version>
<type>pom</type>
<scope>import</scope>
</dependency>
</dependencies>
</dependencyManagement>
中国区目前仅支持标准Queue,不支持FIFO Queue,下面代码以标准Queue为例,演示了创建Queue、配置Dead Letter Queue、发送Message、接收Message、删除Message、删除Queue的方法:
import com.amazonaws.regions.Regions;
import com.amazonaws.services.sqs.AmazonSQS;
import com.amazonaws.services.sqs.AmazonSQSClientBuilder;
import com.amazonaws.services.sqs.model.*;
import java.util.*;
public class SqsUtil {
private static final String ARN_ATTRIBUTE_NAME = "QueueArn";
private static AmazonSQS sqs;
static {
sqs = AmazonSQSClientBuilder.standard().withRegion(Regions.CN_NORTH_1).build();
}
private SqsUtil() {
}
public static String createQueue(String queueName) {
System.out.println("Creating a new SQS queue called " + queueName);
CreateQueueRequest createQueueRequest = new CreateQueueRequest(queueName);
Map<String, String> attributes = new HashMap<>();
// 接收消息等待时间
attributes.put("ReceiveMessageWaitTimeSeconds", "5");
createQueueRequest.withAttributes(attributes);
return sqs.createQueue(createQueueRequest).getQueueUrl();
}
public static String createDeadLetterQueue(String queueName) {
String queueUrl = createQueue(queueName);
// 配置Dead Letter Queue时使用ARN
return getQueueArn(queueUrl);
}
public static void configDeadLetterQueue(String queueUrl, String deadLetterQueueArn) {
System.out.println("Config dead letter queue for " + queueUrl);
SetQueueAttributesRequest queueAttributes = new SetQueueAttributesRequest();
Map<String, String> attributes = new HashMap<>();
// 最大接收次数设为5,当接收次数超过5后,消息未被处理和删除将被转到死信队列
attributes.put("RedrivePolicy", "{\"maxReceiveCount\":\"5\", \"deadLetterTargetArn\":\"" + deadLetterQueueArn + "\"}");
queueAttributes.setAttributes(attributes);
queueAttributes.setQueueUrl(queueUrl);
sqs.setQueueAttributes(queueAttributes);
}
public static void sendMessage(String queueUrl, String message) {
System.out.println("Sending a message to " + queueUrl);
SendMessageRequest request = new SendMessageRequest();
request.withQueueUrl(queueUrl);
request.withMessageBody(message);
Map<String, MessageAttributeValue> messageAttributes = new HashMap<>();
// 添加消息属性,注意必须要有DataType和Value
messageAttributes.put("Hello", new MessageAttributeValue().withDataType("String").withStringValue("COCO"));
request.withMessageAttributes(messageAttributes);
sqs.sendMessage(request);
}
public static void receiveMessages(String queueUrl) {
System.out.println("Receiving messages from " + queueUrl);
ReceiveMessageRequest receiveMessageRequest = new ReceiveMessageRequest(queueUrl);
receiveMessageRequest.setMaxNumberOfMessages(5);
receiveMessageRequest.withWaitTimeSeconds(10);
// 要添加MessageAttributeNames,否则不能接收
receiveMessageRequest.setMessageAttributeNames(Arrays.asList("Hello"));
List<Message> messages = sqs.receiveMessage(receiveMessageRequest).getMessages();
for (Message message : messages) {
System.out.println("Message: " + message.getBody());
for (Map.Entry<String, MessageAttributeValue> entry : message.getMessageAttributes().entrySet()) {
System.out.println(" Attribute");
System.out.println(" Name: " + entry.getKey());
System.out.println(" Value: " + entry.getValue().getStringValue());
}
// Delete message
System.out.println("Deleting a message.");
String messageReceiptHandle = message.getReceiptHandle();
sqs.deleteMessage(new DeleteMessageRequest(queueUrl, messageReceiptHandle));
}
}
public static void deleteQueue(String queueUrl) {
System.out.println("Deleting the queue " + queueUrl);
sqs.deleteQueue(new DeleteQueueRequest(queueUrl));
}
public static String getQueueArn(String queueUrl) {
List<String> attributes = new ArrayList<>();
attributes.add(ARN_ATTRIBUTE_NAME);
GetQueueAttributesResult queueAttributes = sqs.getQueueAttributes(queueUrl, attributes);
return queueAttributes.getAttributes().get(ARN_ATTRIBUTE_NAME);
}
}
在运行上面代码前,要在{HOME}/.aws目录下配置credentials,用户要有SQS权限:
[default]
aws_access_key_id = AAAAAAAAAAAAAA
aws_secret_access_key = MXXXXXXXXXXXXXXXXXXXXXX9
测试一下:
// 创建Dead Letter Queue
String deadLetterQueueArn = createDeadLetterQueue("DeadLetterQueue");
// 创建Task Queue
String queueUrl = createQueue("TaskQueue");
// 配置Dead Letter Queue
configDeadLetterQueue(queueUrl, deadLetterQueueArn);
// 发送Message
for (int i = 0; i < 6; i++) {
sendMessage(queueUrl, "Hello COCO " + i);
}
// 接收Message
receiveMessages(queueUrl);
// 删除Queue
deleteQueue(queueUrl);
Lambda函数定义支持两种方式 :
import com.amazonaws.services.lambda.runtime.RequestHandler;
import com.amazonaws.services.lambda.runtime.Context;
public class Hello implements RequestHandler<Request, Response> {
// Request,Response为自定义的类型
public Response handleRequest(Request request, Context context) {
String greetingString = String.format("Hello %s %s.", request.firstName, request.lastName);
return new Response(greetingString);
}
}
outputType handler-name(inputType input, Context context) {
...
}
inputType 和 outputType 可为以下类型之一:
如不需要,可以省略处理程序方法签名中的 Context 对象。
先编写一个简单的测试用例接收SQS消息,输入参数input为Queue URL:
import com.amazonaws.services.lambda.runtime.Context;
import com.amazonaws.services.lambda.runtime.LambdaLogger;
import com.amazonaws.services.lambda.runtime.RequestHandler;
public class Hello implements RequestHandler<String, String> {
@Override
public String handleRequest(String input, Context context) {
LambdaLogger logger = context.getLogger();
logger.log("received : " + input);
SqsUtil.receiveMessages(input);
return "success";
}
}
程序编写完了,如何放入到Lambda函数中呢?需要打成jar包,且须包含依赖包,pom中增加shade插件:
<build>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-shade-plugin</artifactId>
<version>3.1.0</version>
<configuration>
<createDependencyReducedPom>false</createDependencyReducedPom>
</configuration>
<executions>
<execution>
<phase>package</phase>
<goals>
<goal>shade</goal>
</goals>
</execution>
</executions>
</plugin>
</plugins>
</build>
下面通过Web Console创建Lambda Function
注意:role要有lambda、Cloudwatch Logs、SQS权限。
然后上传jar包,配置Handler
再调整一下内存配置和超时参数,保存。
配置测试参数,测试一下先:
执行成功输出:
下面修改一下代码,输入参数类型改为ScheduledEvent,将使用触发器CloudWatch Events调用。
import com.amazonaws.services.lambda.runtime.Context;
import com.amazonaws.services.lambda.runtime.LambdaLogger;
import com.amazonaws.services.lambda.runtime.RequestHandler;
import com.amazonaws.services.lambda.runtime.events.ScheduledEvent;
public class Hello implements RequestHandler<ScheduledEvent, String> {
@Override
public String handleRequest(ScheduledEvent input, Context context) {
LambdaLogger logger = context.getLogger();
logger.log("received : " + input.toString() + "\n");
SqsUtil.receiveMessages("https://sqs.cn-north-1.amazonaws.com.cn/891245999999/TaskQueue");
return "success";
}
}
上传后,同样先手工测试一下,这次选择模板Scheduled Event
测试成功后,配置CloudWatch Events触发器,Rule Type选择Schedule expression:
保存后就可以定时调用lambda了,O。
Integrate SQS and Lambda: serverless architecture for asynchronous workloads
Amazon Simple Queue Service Developer Guide
AWS Lambda Developer Guide
Programming Model for Authoring Lambda Functions in Java
AWS SDK for Java Developer Guide
Schedule Expressions Using Rate or Cron
AWS 视频中心
AWS微服务和无服务器架构入门
快速理解AWS Lambda,轻松构建Serverless后台
用无服务器应用模型构建AWS Lambda应用
如何通过运行无服务器来满足企业需求
无服务器架构设计模式和最佳实践
标签:led services oid 方式 cloud reg mod dea 处理
原文地址:http://blog.51cto.com/7308310/2072301