当前位置: 首页 > news >正文

使用亚马逊SQS实现一个队列任务,包括:向队列发送消息和从队列中读取消息

SQS

Amazon Simple Queue Service (SQS)是一种完全托管式消息队列服务,让您可以分离和扩展微服务、分布式系统和无服务器应用程序。
SQS消除了与管理和操作面向消息的中间件相关的复杂性和开销,并使开发人员能够专注于差异化工作。使用SQS,您可以发送、存储和接收任何卷中的软件组件之间的信息,而不会丢失任何信息,也无需使用其他服务。使用亚马逊云科技管理控制台、您选择的命令行界面或SDK 以及三个简单的命令,在几分钟内即可开始使用 SQS。

SQS 提供两种类型的消息队列。标准队列可提供最大的吞吐量、最优排序和至少一次交付。SQS FIFO
队列旨在保证按照发送消息的确切顺序一次处理好消息。

使用亚马逊SQS实现队列任务:

import logging
import boto3
from botocore.exceptions import ClientError
import jsonlogger = logging.getLogger(__name__)AWS_REGION_NAME = "" # 域名名称
AWS_ACCESS_KEY_ID = "" # aws_access_key_id
AWS_SECRET_ACCESS_KEY = "" # aws_secret_access_key# 创建SQS客户端
sqs = boto3.client('sqs',region_name=AWS_REGION_NAME,aws_access_key_id=AWS_ACCESS_KEY_ID,aws_secret_access_key=AWS_SECRET_ACCESS_KEY)# 队列URL
queue_url = 'https://sqs.cn-northwest-1.amazonaws.com.cn/aws_account_id/queue_name'# 发送消息到队列
def send_message(messages):try:response = sqs.send_message(QueueUrl=queue_url,MessageBody=messages,# 可选的消息属性MessageAttributes={'Attribute1': {'StringValue': 'Value1','DataType': 'String'}})if "Successful" in response:for msg_meta in response["Successful"]:logger.info("Message sent: %s: %s",msg_meta["MessageId"],messages[int(msg_meta["Id"])]["body"])if "Failed" in response:for msg_meta in response["Failed"]:logger.warning("Failed to send: %s: %s",msg_meta["MessageId"],messages[int(msg_meta["Id"])]["body"])except ClientError as error:logger.exception("Send messages failed to queue: %s", sqs)raise errorelse:return response# 批量发送消息
def batch_send_message(entries):try:response = sqs.send_message_batch(QueueUrl=queue_url,Entries=entries  # 最多支持10条消息)if "Successful" in response:for msg_meta in response["Successful"]:logger.info("Message sent: %s: %s",msg_meta["MessageId"],# messages[int(msg_meta["Id"])]["body"])if "Failed" in response:for msg_meta in response["Failed"]:logger.warning("Failed to send: %s: %s",msg_meta["MessageId"],# messages[int(msg_meta["Id"])]["body"])except ClientError as error:logger.exception("Send messages failed to queue: %s", sqs)raise errorelse:return response# 批量将笔记列表存入到队列中
async def batch_send_note_list(messages, size=10):for chunk in get_chunks(messages, size):entries = []for item in chunk:entries.append({"Id": item.get('id'),"MessageBody": json.dumps(item)})batch_send_message(entries)# 从队列中读取消息
def recesive_message(max_number=10, wait_time=10*3*60):try:# TODO:获取消息小于0条,停止获取任务response = sqs.receive_message(QueueUrl=queue_url,# MessageAttributeNames=["All"],  # 接收所有消息属性MaxNumberOfMessages=max_number,  # 一次接收的最大消息数 (1-10)# WaitTimeSecondes=wait_time,  # 可选,等待消息的时间 (长轮询)VisibilityTimeout=30  # 可选,消息的隐藏时间)# for msg in messages:#     logger.info("Recesived message: %s: %s", msg.message_id, msg.body)# 检查是否有消息if "Messages" in response:for message in response["Messages"]:# 打印消息体print(f"Message Body: {message['Body']}")# 如果有消息属性if "MessageAttributes" in message:print("Message Attributes")for key, value in message['MessageAttributes'].items():print(f"{key}: {value['StringValue']}")sqs.delete_message(QueueUrl=queue_url,ReceiptHandle=message['ReceiptHandle'])print("Message deleted from queue")except ClientError as error:logger.exception("Couldn't recesive messages from queue: %s", sqs)raise errorelse:return response# 以10为步长读取数组中的元素
def get_chunks(arr, chunk_size):for i in range(0, len(arr), chunk_size):yield arr[i: i + chunk_size]if __name__ == "__main__":print("Send Message")message = []# for i, msg in enumerate(message):#     send_message(json.dumps(msg))for chunk in get_chunks(message, 10):entries = []for item in chunk:entries.append({"Id": item.get('id'),"MessageBody": json.dumps(item)})batch_send_message(entries)# print("Recesive Message")# recesive_message()

http://www.mrgr.cn/news/57389.html

相关文章:

  • 如何保护服务器的系统日志
  • centos系统防火墙SELinux设置指令
  • Would you like conda to send this report to the core maintainers? [y/N]:
  • [Ansible实践笔记]自动化运维工具Ansible(一):初探ansibleansible的点对点模式
  • Vue01
  • deepinlinux v23安装pl2303的usb串口驱动
  • IBM Granite 3.0:一款开源,SOTA 企业模型
  • python画图|坐标轴显隐设置
  • 【开源鸿蒙】OpenHarmony 5.0轻量系统最小开发环境搭建
  • AI自主学习:未来的智能系统
  • 近似推断 - 最大后验推断和稀疏编码篇
  • AI学习指南深度学习篇-对比学习的变种
  • Python | Leetcode Python题解之第503题下一个更大元素II
  • SELinux详解
  • Golang | Leetcode Golang题解之第504题七进制数
  • 一文彻底搞透Redis的数据类型及具体的应用场景
  • 重温Java基础语法随笔录
  • 【QT】常用控件(四)
  • 12_Linux进程管理命令详解
  • 使用Dask在多块AMD GPU上加速XGBoost
  • 深度学习(五):语音处理领域的创新引擎(5/10)
  • 大模型的特点、重要概念及工作方式详解
  • Leetcode 875 KoKo Eats banana
  • 问:数据库,脏读、幻读、不可重复读~
  • 分布式系统集群中节点管理
  • C++ -stack、queue