使用Azure存储队列和Python自动发送推文
#twitter #python #azure #mentalhealth

十月是多动症意识月,今年十月,我想做些事情来为患有多动症和其他神经多样性的人们带来意识和鼓励。

跳到

除了在Refactr.Tech和出现和a few podcasts上讲话外,我还想创建一些东西:

1. Would have a wider reach. 
2. Would let me play with Python and Azure

如果我看着最大范围的位置,那就是 twitter 。因此,我决定创建一个自动化的Twitter广告系列。

如果我想和python一起玩 - 那么我可以在程序上发送Tweets

如果我想和Azure一起玩...我可以用 azure函数安排推文

您可以在Part 3

中看到结果

获取内容

如果您还没有看到本系列中的其他部分向可能被诊断或正在考虑寻求诊断的人们的匿名信息。

存储队列

我需要一种存储消息的方法,以便可以称呼它们。

最初,我一直在考虑使用像Cosmos DB这样的NOSQL数据库。我本可以添加schedule day,然后让脚本在当天拉消息。

这会起作用,但是旋转数据库以存储30个文档的感觉就像是很多。

我的下一个想法是做一些更轻的事情(例如sqlite),然后我想起了a talk from PyTexas在Python中使用queues。虽然我不希望脚本运行这么长时间,但我开始考虑将文本存储在队列中。这是我记得有Azure Storage Queues的时候。

my messages in azure storage queues

简而言之,存储队列允许您存储数百万个消息(字节或Unicode)。它的主要功能是从队列的顶部提取数据,然后删除消息或将其推到底部。

这是完美的,因为我已经有了消息列表。我只需要将它们添加到队列中。然后,我的Azure函数只会从队列处理消息中调用。

from azure.storage.queues import QueueClient
import json

queue = QueueClient.from_connection_string(
    conn_str=<CONNECTION_STRING>, queue_name=<QUEUE_NAME>
)

for idx, message in enumerate(messages):
    queue.send_message({"index": idx, "text": message})

# to receive the message

_msg = json.loads(queue.receive_message()['content'])

创建图像

将文本发送到Twitter似乎并不是最大的选择。有一个角色限制,研究表明tweets are more engaging when you add images。这给了我创建基本图像并覆盖图像上的文本的想法。

Base Image from Canva

我能够使用Canva构建一个简单的基础图像。然后,我使用Pillow叠加了文本。在图像上使用文本很复杂,因此我选择创建4个类别和每个元素的比率。

{
    "md": 0.010,
    "lg": 0.012,
    "xl": 0.014,
    "2xl": 0.020,
}

然后,我使用该比率自动自动文本,直到边界框是所需的尺寸。

draw = ImageDraw.Draw(image)
font_size = 1
font = ImageFont.truetype("assets/Lato-BoldItalic.ttf", font_size)

while font.getbbox(text)[1] < image_size_ratio:
   font_size += 1
   font = ImageFont.truetype("assets/Lato-BoldItalic.ttf", font_size)

这不是完美的,但是很好。然后我在图像中添加了索引和一个主题标签,然后我准备就绪。

Example Image from the Queue

Twitter访问

下一步是发送推文。 Twitter有一个API,您可以在其中与应用程序进行交互。最基本的访问是免费的。但是,为了推特图像,我需要请求提升访问。我没有直接使用API​​,而是选择使用Python软件包Tweepy

首先使用客户端和访问密钥和秘密连接您的Twitter帐户。然后,您需要使用V1 API或(API)将图像存储为对象。

import tweepy
from bytes import BytesIO #This (lets you save the image in memory and not to a file)

auth = tweepy.OAuth1UserHandler(
            consumer_key=self.consumer_key,
            consumer_secret=self.consumer_secret,
            access_token=self.access_token,
            access_token_secret=self.access_token_secret,
)

image = BytesIO()
img.save(image, format="PNG")
image.seek(0)
api = tweepy.API(auth)
media = api.media_upload(file=image, filename="my_file.png")

然后,您使用v2 api(Client)发送媒体嵌入消息。

# with the message queued message as _msg
self.twitterv2.create_tweet(
        "text": f"{_msg['index']}: {_msg['text']} #31DaysOfNeurodivergence",
        "file": media.id
        "filename": f"{_msg['index']}{_msg['text'][:10]}",
    })

改善过程

弄清楚这一过程存在一些挑战。我和我的队友Pamela Fox和我整理了一个名为AZ Queue Tweeter的小模块,该模块在此过程中扩展了,您不仅可以将带有图像的推文发送到存储队列,还可以在Twitter上支持全部个人帐户。这是帕梅拉(Pamela)发送Twitter民意调查的一个示例。

import json

qt.queue_message(
    json.dumps({
        "text": "Whats your fav Python web framework?",
        "poll_options": ["Flask", "Django", "FastAPI", "All of em!"],
        "poll_duration_minutes": 60*24}
    )
)

qt.send_next_message(message_transformer=lambda msg: json.loads(msg))

Pamela's Resulting Twitter Poll

这也引入了在发送文本之前操纵文本的能力。在我的脚本中,我使用spacy将消息细分为句子,将文本保持在280个字符中。

运行脚本

所以我们已经浏览了如何制作推文。我们如何确保推文每天运行?这是Azure功能开始播放的地方。我能够创建一个简单的计时器功能,该功能每天在10月份运行一次。

{
  "scriptFile": "__init__.py",
  "bindings": [
    {
      "name": "mytimer",
      "type": "timerTrigger",
      "direction": "in",
      "schedule": "0 0 12 * 10 *"
    }
  ]
}

您可以使用CLIVS Code

The Function Section in VS Code

然后传递一个检索消息,创建图像并发送推文的函数。

def main(mytimer: func.TimerRequest) -> None:
    queue = QueueTweeter(storage_auth=sa, twitter_auth=ta)
    msg = queue.send_next_message(
        message_transformer=load_message, preview_mode=False, delete_after=False
    )

    # Message is in format "Index - Text"
    logging.info(f"{msg} triggered at {datetime.datetime.utcnow()}")

这是值得的努力吗

当我想到我遇到的问题时(很多推文在很长一段时间内发送),我认为我没有很多“完全自动化”的选择。

我确实学习了一些新工具和一些有趣的解决方案(看着您的枕头),我本月的总费用不到0.10美元。

Monthly Cost to run

解决问题实际上非常有价值,因为我现在打算使用相同的工具来自动化我的一些播客和其他内容。我还可以快速修改它以使用Azure队列将消息发送到其他频道(例如Discord或Teams)。

我已经使用烧瓶 + htmx将此队列转换为website

Flask Site Using the Queue

一探究竟

Check out the repo看看我如何能够构建此活动。最后查看AZ Queue Tweeter以创建自己的广告系列!