tl; dr 本文旨在为MindSDB创建新应用程序(Gmail)处理程序,然后使用该处理程序创建一个电子邮件bot,以回复传入的电子邮件有趣而诗意。
介绍
AI Hype拒绝死亡,更是如此,在聊天GPT发布后,以及最近的GPT4。到目前为止,我一直缺少AI动作,因此,当MindSDB宣布一场黑客马拉松,我看到了他们使用OpenAi API的Twitter机器人实施,我知道我想为Hackathon构建一个机器人。
但是,问题出现了,这是一个用于什么目的和应用的机器人? Twitter已经完成并尘土飞扬。我希望我的机器人成为一个始终可用的明智而机智的伴侣,所以这帮助我在Gmail机器上将其归零。但是Gmail集成尚未实施,这是选择构建它的第二个原因。练习我的Python技能以及为一个好项目做出贡献的想法是其他重要原因。
设置环境
由于第一个任务是开发Gmail处理程序,因此我们必须设置开发环境。但是在此之前,我需要了解how to contribute to this project和how to install MindsDb for development。在安装过程中,我只面对与libmagic
相关的一个问题,该问题默认情况下未安装在Mac上,因此必须使用brew install libmagic
进行安装。
下一步是学习basics of creating an app handler。这给了我一个很好的概述,概述了我应该为创建Gmail处理程序做什么。
浏览相关文档,如果您想为任何现有项目做出贡献,则遵循上述步骤至关重要。
运行现有安装
现在是时候让我的手变得肮脏了,朝这个方向的第一步是使用现有的应用程序处理程序。但是在此之前,我遵循了本地MindsDB Web控制台中“ Learning Hub”的“预测房屋价格”教程,一切都很好。
接下来是Twitter处理程序的转弯。我尝试使用Local MindSDB浏览器控制台中的以下命令创建一个Tweets数据库。
CREATE DATABASE my_twitter
WITH
ENGINE = 'twitter',
PARAMETERS = {
"bearer_token": "twitter bearer token",
"consumer_key": "twitter consumer key",
"consumer_secret": "twitter consumer key secret",
"access_token": "twitter access token",
"access_token_secret": "twitter access token secret"
};
至少应该出错说凭据无效,但是我得到以下错误
Can't connect to db: Handler 'twitter' can not be used
好吧,那真是令人叹为观止。为什么它不起作用?这是您开始调试并找出代码库中发生的事情的地方。我们该怎么做?我只是在代码库中搜索了错误字符串"Can't connect to db"
,发现问题与处理程序没有导入有关。在进行了更多调查之后,ZSH控制台具有此信息消息
Dependencies for the handler 'twitter' are not installed by default. If you want to use "twitter" please install "['tweepy']"
,我们有。这使我们有一个线索,如果我们想使用任何其他处理程序(基本处理程序除外),我们需要手动安装他们的依赖项。
pip install tweepy
和重新启动MindSDB足以获得我首先希望的错误
Can't connect to db: Error connecting to Twitter api: 401 Unauthorized Unauthorized. Check bearer_token
现在我们都准备开发。正如文档所说,要研究Twitter处理程序,我只是创建了Twitter_handler文件夹的副本,然后将其重命名为Gmail_handler。然后在__init__.py
和__about__.py
文件中用“ Gmail”替换为“ Twitter”,以及gmail_handler
文件中的相关方法名称更改。通过执行相同的Twitter创建数据库命令来验证它,但用“ Gmail”替换引擎,它似乎调用我们的Gmail_handler。
实施Gmail处理程序
通过how to create an application handler中提到的步骤进行操作,我们需要修改以下方法。在读取/写电子邮件之前,我们需要对用户进行身份验证,因此第一个目标是connect
和check_connection
方法。
为Gmail API设置Google项目
要使用Gmail API,我们需要设置一个Google Cloud Project和一个使用Gmail启用的Google帐户。我们还需要从Google Cloud Console启用Gmail API。
然后,我们需要为身份验证用户创建OAuth客户端ID,并可能是Auth同意屏幕(如果这是我们第一次设置OAuth)
设置OAuth客户端ID将为我们提供一个凭据文件,我们在Gmail_handler中需要该文件以进行连接。您可以找到有关how to set up a Google project for the Gmail APIs here的更多信息。
构成GmailHandler课程
我们使用连接参数(使用创建数据库命令传递)并将其存储以备将来使用。我们还注册了一个"emails"
表,我们将存储我们的数据。
class GmailHandler(APIHandler):
"""A class for handling connections and interactions with the Gmail API.
Attributes:
credentials_file (str): The path to the Google Auth Credentials file for authentication
and interacting with the Gmail API on behalf of the uesr.
scopes (List[str], Optional): The scopes to use when authenticating with the Gmail API.
"""
def __init__(self, name=None, **kwargs):
super().__init__(name)
self.connection_args = kwargs.get('connection_data', {})
self.credentials_file = self.connection_args['credentials_file']
self.scopes = self.connection_args.get('scopes', DEFAULT_SCOPES)
self.token_file = None
self.max_page_size = 500
self.max_batch_size = 100
self.service = None
self.is_connected = False
emails = EmailsTable(self)
self._register_table('emails', emails)
处理Google身份验证
按照上一节中的链接以及MindSDB代码要求,我们需要执行以下
-
使用以下内容替换gmail_handler文件夹中的cumistry.txt的内容。请记住使用PIP安装命令安装这些模块
google-api-python-client google-auth-httplib2 google-auth-oauthlib
-
connect
方法。在这里,我们使用上一节中创建的凭据文件来验证用户。
def connect(self): """Authenticate with the Gmail API using the credentials file. Returns ------- service: object The authenticated Gmail API service object. """ if self.is_connected is True: return self.service self.service = self.create_connection() self.is_connected = True return self.service def create_connection(self): creds = None token_file = os.path.join(os.path.dirname(self.credentials_file), 'token.json') if os.path.isfile(token_file): creds = Credentials.from_authorized_user_file(token_file, self.scopes) if not creds or not creds.valid: if creds and creds.expired and creds.refresh_token: creds.refresh(Request()) elif not os.path.isfile(self.credentials_file): raise Exception('Credentials must be a file path') else: flow = InstalledAppFlow.from_client_secrets_file(self.credentials_file, self.scopes) creds = flow.run_local_server(port=0, timeout_seconds=120) # Save the credentials for the next run with open(token_file, 'w') as token: token.write(creds.to_json()) return build('gmail', 'v1', credentials=creds)
-
check_connection
方法
def check_connection(self) -> StatusResponse: """Check connection to the handler. Returns ------- StatusResponse Status confirmation """ response = StatusResponse(False) try: # Call the Gmail API service = self.connect() result = service.users().getProfile(userId='me').execute() if result and result.get('emailAddress', None) is not None: response.success = True except HttpError as error: response.error_message = f'Error connecting to Gmail api: {error}.' log.logger.error(response.error_message) if response.success is False and self.is_connected is True: self.is_connected = False return response
现在,我们准备运行创建数据库命令并验证用户(当然,我们尚未决定表的列,但我们会来到这一点)。只需运行
CREATE DATABASE mindsdb_gmail
WITH ENGINE = 'gmail',
PARAMETERS = {
"credentials_file": "mindsdb/integrations/handlers/gmail_handler/credentials.json"
};
从Gmail API获取电子邮件
使用MindSDB获取电子邮件的流量是这样的:您执行SQL SELECT
查询,而APITable
类的select
方法被调用。在那里,您可以解析查询参数,最后调用Gmail API。
- 电子邮件stable的
select
方法
class EmailsTable(APITable):
"""Implementation for the emails table for Gmail"""
def select(self, query: ast.Select) -> Response:
"""Pulls emails from Gmail "users.messages.list" API
Parameters
----------
query : ast.Select
Given SQL SELECT query
Returns
-------
pd.DataFrame
Email matching the query
Raises
------
NotImplementedError
If the query contains an unsupported operation or condition
"""
conditions = extract_comparison_conditions(query.where)
params = {}
for op, arg1, arg2 in conditions:
if op == 'or':
raise NotImplementedError(f'OR is not supported')
if arg1 in ['query', 'label_ids', 'include_spam_trash']:
if op == '=':
if arg1 == 'query':
params['q'] = arg2
elif arg1 == 'label_ids':
params['labelIds'] = arg2.split(',')
else:
params['includeSpamTrash'] = arg2
else:
raise NotImplementedError(f'Unknown op: {op}')
else:
raise NotImplementedError(f'Unknown clause: {arg1}')
if query.limit is not None:
params['maxResults'] = query.limit.value
result = self.handler.call_gmail_api(
method_name='list_messages',
params=params
)
# filter targets
columns = []
for target in query.targets:
if isinstance(target, ast.Star):
columns = []
break
elif isinstance(target, ast.Identifier):
columns.append(target.parts[-1])
else:
raise NotImplementedError(f"Unknown query target {type(target)}")
if len(columns) == 0:
columns = self.get_columns()
# columns to lower case
columns = [name.lower() for name in columns]
if len(result) == 0:
result = pd.DataFrame([], columns=columns)
else:
# add absent columns
for col in set(columns) & set(result.columns) ^ set(columns):
result[col] = None
# filter by columns
result = result[columns]
return result
-
get_columns
方法。这些是我们的"EmailsTable"
拥有的列。
def get_columns(self):
"""Gets all columns to be returned in pandas DataFrame responses
Returns
-------
List[str]
List of columns
"""
return [
'id',
'message_id',
'thread_id',
'label_ids',
'from',
'to',
'date',
'subject',
'snippet',
'body',
]
-
GmailHandler
类的call_gmail_api
方法。 Gmail消息API的工作方式是,首先,它返回符合查询条件的消息列表。这些消息仅包含threadID和messageID等,而不包含完整的电子邮件。然后,使用"messageIds"
您分别获取完整消息。
def call_gmail_api(self, method_name: str = None, params: dict = None):
"""Call Gmail API and map the data to pandas DataFrame
Args:
method_name (str): method name
params (dict): query parameters
Returns:
DataFrame
"""
service = self.connect()
if method_name == 'list_messages':
method = service.users().messages().list
elif method_name == 'send_message':
method = service.users().messages().send
else:
raise NotImplementedError(f'Unknown method_name: {method_name}')
left = None
count_results = None
if 'maxResults' in params:
count_results = params['maxResults']
params['userId'] = 'me'
data = []
limit_exec_time = time.time() + 60
while True:
if time.time() > limit_exec_time:
raise RuntimeError('Handler request timeout error')
if count_results is not None:
left = count_results - len(data)
if left == 0:
break
elif left < 0:
# got more results that we need
data = data[:left]
break
if left > self.max_page_size:
params['maxResults'] = self.max_page_size
else:
params['maxResults'] = left
log.logger.debug(f'Calling Gmail API: {method_name} with params ({params})')
resp = method(**params).execute()
if 'messages' in resp:
self._handle_list_messages_response(data, resp['messages'])
elif isinstance(resp, dict):
data.append(resp)
if count_results is not None and 'nextPageToken' in resp:
params['pageToken'] = resp['nextPageToken']
else:
break
df = pd.DataFrame(data)
return df
- 内部方法
_handle_list_messages_response
和其他相关方法
# Handle the API response by downloading the full messages
# using a Batch Request.
def _handle_list_messages_response(self, data, messages):
total_pages = len(messages) // self.max_batch_size
for page in range(total_pages):
self._get_messages(data, messages[page * self.max_batch_size:(page + 1) * self.max_batch_size])
# Get the remaining messsages, if any
if len(messages) % self.max_batch_size > 0:
self._get_messages(data, messages[total_pages * self.max_batch_size:])
def _get_messages(self, data, messages):
batch_req = self.service.new_batch_http_request(lambda id, response, exception: self._parse_message(data, response, exception))
for message in messages:
batch_req.add(self.service.users().messages().get(userId='me', id=message['id']))
batch_req.execute()
# This method shows how to parse the full email returned
# by the Gmail API
def _parse_message(self, data, message, exception):
if exception:
log.logger.error(f'Exception in getting full email: {exception}')
return
payload = message['payload']
headers = payload.get("headers")
parts = payload.get("parts")
row = {
'id': message['id'],
'thread_id': message['threadId'],
'label_ids': message.get('labelIds', []),
'snippet': message.get('snippet', ''),
}
if headers:
for header in headers:
key = header['name'].lower()
value = header['value']
if key in ['from', 'to', 'subject', 'date']:
row[key] = value
elif key == 'message-id':
row['message_id'] = value
row['body'] = self._parse_parts(parts)
data.append(row)
def _parse_parts(self, parts):
if not parts:
return
body = ''
for part in parts:
if part['mimeType'] == 'text/plain':
part_body = part.get('body', {}).get('data', '')
body += urlsafe_b64decode(part_body).decode('utf-8')
elif part['mimeType'] == 'multipart/alternative' or 'parts' in part:
# Recursively iterate over nested parts to find the plain text body
body += self._parse_parts(part['parts'])
else:
log.logger.debug(f"Unhandled mimeType: {part['mimeType']}")
return body
以上足以在数据库中获取和存储身份验证的用户的电子邮件。我们可以在下面运行SQSL选择查询以获取电子邮件。查询参数支持Gmail API中可用的所有filter options
SELECT *
FROM mindsdb_gmail.emails
WHERE query = 'from:test@example.com OR search_text OR from:test@example1.com'
AND label_ids = "INBOX,UNREAD"
LIMIT 20;
使用Gmail API发送电子邮件
要通过Gmail API和MindSDB发送电子邮件,我们需要使用SQL INSERT
查询。这反过
def insert(self, query: ast.Insert):
"""Sends emails using the Gmail "users.messages.send" API
Parameters
----------
query : ast.Insert
Given SQL INSERT query
Raises
------
ValueError
If the query contains an unsupported condition
"""
columns = [col.name for col in query.columns]
if self.handler.connection_args.get('credentials_file', None) is None:
raise ValueError(
"Need the Google Auth Credentials file in order to write an email"
)
supported_columns = {"message_id", "thread_id", "to_email", "subject", "body"}
if not set(columns).issubset(supported_columns):
unsupported_columns = set(columns).difference(supported_columns)
raise ValueError(
"Unsupported columns for create email: "
+ ", ".join(unsupported_columns)
)
for row in query.values:
params = dict(zip(columns, row))
if not 'to_email' in params:
raise ValueError('"to_email" parameter is required to send an email')
message = EmailMessage()
message['To'] = params['to_email']
message['Subject'] = params['subject'] if 'subject' in params else ''
content = params['body'] if 'body' in params else ''
message.set_content(content)
# If threadId is present then add References and In-Reply-To headers
# so that proper threading can happen
if 'thread_id' in params and 'message_id' in params:
message['In-Reply-To'] = params['message_id']
message['References'] = params['message_id']
encoded_message = urlsafe_b64encode(message.as_bytes()).decode()
message = {
'raw': encoded_message
}
if 'thread_id' in params:
message['threadId'] = params['thread_id']
self.handler.call_gmail_api('send_message', {'body': message})
此方法调用了我们之前看到的同一call_gmail_api
发送电子邮件。我们可以使用SQL INSERT
查询发送电子邮件。仅当我们回复传入的电子邮件并希望我们的答复应与原始电子邮件形成线程时,只需要thread_id
和message_id
参数值。 ("subject"
应该与原始主题行完全匹配以使其工作)
INSERT INTO mindsdb_gmail.emails (thread_id, message_id, to_email, subject, body)
VALUES ('187cbdd861350934d', '8e54ccfd-abd0-756b-a12e-f7bc95ebc75b@Spark', 'test@example2.com', 'Trying out MindsDB',
'This seems awesome. You must try it out whenever you can.')
创建Gmail机器人
现在我们没有阻碍,可以使用我们闪亮的新GmailHandler轻松获取/发送电子邮件,我们可以使用Gmail Bot进行工作。
获得OpenAI API键
由于我们在本地开发此过程,因此我们没有使用MindSDB Cloud提供的内置API密钥的奢侈品。我们需要在OpenAI上创建一个帐户并创建一个API密钥。
使用MindSDB训练模型
我无法访问GPT4 API,因此我正在使用gpt-3.5-turbo
模型本身。我们只是告诉GPT,以适当的致敬和签名来回复电子邮件,并保持电子邮件的随意。这是使用提示_template参数完成的。
CREATE MODEL mindsdb.gpt_model
PREDICT response
USING
engine = 'openai',
max_tokens = 500,
api_key = '<your_api_key>',
model_name = 'gpt-3.5-turbo',
prompt_template = 'From input message: {{input_text}}\
by from_user: {{from_email}}\
In less than 500 characters, write an email response to {{from_email}} in the following format:\
Start with proper salutation and respond with a short message in a casual tone, and sign the email with my name mindsdb';
培训完成后,我们就可以看到我们的机器人的行动。运行以下命令
SELECT response
FROM mindsdb.gpt_model_email
WHERE from_email = "alice@example.com"
AND input_text = "Hi there, I'm bored. Give me a puzzle to solve";
我们得到以下回应,似乎还不错,不是吗?
要求一个新的难题,它说以下
给机器人一个角色
由于我们最初的实验似乎效果很好,因此我们现在可以冒险。让我们从《星球大战》电影和著名诗人埃德加·艾伦·坡(Edgar Allan Poe)中给我们的机器人一个尤达大师的角色。我们通过在我们的早期命令中更改提示_template来做到这一点
CREATE MODEL mindsdb.gpt_model_yodapoe
PREDICT response
USING
engine = 'openai',
max_tokens = 800,
api_key = '<your_api_key>',
model_name = 'gpt-3.5-turbo', -- you can also use 'text-davinci-003' or 'gpt-3.5-turbo'
prompt_template = 'From input message: {{input_text}}\
by from_user: {{from_email}}\
In less than 500 characters, write an email response to {{from_email}} in the following format:\
<respond with a 4 line poem as if you were Edgar Allan Poe but you are also a wise elder like Master Yoda from the Star Wars movies. The wordings should be like Master Yoda but the format should be like Poe. Do not mention that you are Master Yoda or Edgar Allan Poe. Sign it with a made up quote similar to what Voltaire, Nietzsche etc would say. Do not explain or say anything else about the quote.>';
训练模型,然后通过更改模型名称来运行相同的选择查询
SELECT response
FROM mindsdb.gpt_model_yodapoe
WHERE from_email = "alice@example.com"
AND input_text = "Hi there, I'm bored. Give me a puzzle to solve";
这就是我得到的
将输入文本更改为以下
SELECT response
FROM mindsdb.gpt_model_yodapoe
WHERE from_email = "alice@example.com"
AND input_text = "Hi there, What's in a hackathon?";
我们得到以下结果
总的来说,这似乎很好。我们总是可以根据自己的口味微调角色。现在,我们可以将此响应一代与实际的电子邮件发送联系,我们最好创建一个计划的作业,以定期阅读电子邮件,并根据一些预定的标准回复它们。
结果
当我开始从事此功能时,我的目标及其各自的结果
-
创建Gmail处理程序:我认为这已经完成。我可能需要在适当的时候解决一些错误
-
为MindSDB项目做出了贡献:我已经为我的更改做了opened a PR,现在我希望它合并到代码库中。
-
创建一个gmail bot:我们已经准备好所有成分,我们只需要在某个地方部署它,以便始终可用。我确实尝试将源部署在液滴上,但我遇到了一个错误,我筹集了GitHub issue。
-
练习我的python技能:我认为在研究该功能时,我在此方面取得了良好的进步。 Python不是我的专业领域,所以我很高兴能在7-8天的短期内创建一个工作集成。
结论
总的来说,创建一个Gmail机器人和MindSDB所需的Gmail处理程序非常有趣和有趣。在此过程中,我必须看到MindSDB代码库的内部工作。我学会了使用Gmail API以及如何在幕后构建电子邮件以及如何解析它。它还使我可以使用MindSDB与OpenAi进行集成,现在我也可以说AI正在吃世界: - )。
希望您喜欢阅读文章。如果您注意到任何错误或任何问题,请在评论中告诉我。
- 继续添加位,很快您的字节超过您可能需要的。