使用MongoDB,Spark,Twilio SMS通知和破折号创建选举监视系统
在本文中,我们提供了一种创新解决方案的概念验证(POC),该解决方案在选举监视的背景下应对这一挑战。该解决方案是为一个与一家专门从事数据的年轻数字公司联系的政府设计的,希望选举结果更加透明,易于访问和实时。
该系统旨在摄入选民数据,对其进行处理和分析,一旦准备就绪,通过SMS提醒媒体和有关各方,并最终通过DASH应用程序在交互式地图上显示结果。
数据管道
在此上下文中,Spark群集是用一个工作节点设置的,该节点将执行Spark Master分配的任务。此设置允许有效地处理数据处理任务,如有必要
系统处理的数据来自有趣的来源:投票记录的合成数据集。使用Python图书馆Faker的脚本生成了此数据,模仿了美国和哥伦比亚特区的现实投票行为。合成数据存储在MongoDB中,MongoDB是一个流行的NOSQL数据库,以其灵活性和可扩展性而闻名,这是处理大型数据集(如投票记录)的绝佳选择。
from datetime import datetime
from faker import Faker
from pymongo import MongoClient
# Init Faker
fake = Faker()
## Init MongoDB
client = MongoClient('mongodb://root:example@localhost:27017/')
db = client['admin']
collection = db['votes']
state_weights = {
"Alabama": (0.60, 0.40),
"Alaska": (0.55, 0.45),
"Arizona": (0.15, 0.85),
"Arkansas": (0.20, 0.80),
"California": (0.15, 0.85),
"Colorado": (0.70, 0.30),
"Connecticut": (0.10, 0.90),
"Delaware": (0.34, 0.66),
"Florida": (0.82, 0.18),
"Georgia": (0.95, 0.05),
"Hawaii": (0.50, 0.50),
"Idaho": (0.67, 0.33),
"Illinois": (0.60, 0.40),
"Indiana": ((0.15, 0.85)),
"Iowa": (0.45, 0.55),
"Kansas": (0.40, 0.60),
"Kentucky": (0.62, 0.38),
"Louisiana": (0.58, 0.42),
"Maine": (0.60, 0.40),
"Maryland": (0.55, 0.45),
"Massachusetts": (0.63, 0.37),
"Michigan": (0.62, 0.38),
"Minnesota": (0.61, 0.39),
"Mississippi": (0.41, 0.59),
"Missouri": (0.60, 0.40),
"Montana": (0.57, 0.43),
"Nebraska": (0.56, 0.44),
"Nevada": (0.55, 0.45),
"New Hampshire": (0.54, 0.46),
"New Jersey": (0.53, 0.47),
"New Mexico": (0.52, 0.48),
"New York": (0.51, 0.49),
"North Carolina": (0.50, 0.50),
"North Dakota": (0.05, 0.95),
"Ohio": (0.58, 0.42),
"Oklahoma": (0.57, 0.43),
"Oregon": (0.56, 0.44),
"Pennsylvania": (0.55, 0.45),
"Rhode Island": (0.50, 0.50),
"South Carolina": (0.53, 0.47),
"South Dakota": (0.48, 0.52),
"Tennessee": (0.51, 0.49),
"Texas": (0.60, 0.40),
"Utah": (0.59, 0.41),
"Vermont": (0.58, 0.42),
"Virginia": (0.57, 0.43),
"Washington": (0.44, 0.56),
"West Virginia": (0.55, 0.45),
"Wisconsin": (0.46, 0.54),
"Wyoming": (0.53, 0.47),
"District of Columbia": ((0.15, 0.85))
}
def generate_vote(state):
weights = state_weights.get(state, (0.50, 0.50)) # Get the weights for the state, or use (0.50, 0.50) as a default
vote = {
"voting_time": datetime.now().strftime('%Y-%m-%d %H:%M:%S.%f'), # Updated line
"voter": {
"voter_id": str(fake.unique.random_number(digits=9)),
"first_name": fake.first_name(),
"last_name": fake.last_name(),
"address": {
"street": fake.street_address(),
"city": fake.city(),
"state": state,
"zip_code": fake.zipcode()
},
"birth_date": str(fake.date_of_birth(minimum_age=18, maximum_age=90)),
"gender": fake.random_element(elements=('Male', 'Female')),
},
"vote": {
"voting_date": "2023-06-06",
"voting_location": fake.address(),
"election": {
"type": "Presidential Election",
"year": "2023"
},
"vote_valid": "Yes",
"voting_choice": {
"party": fake.random_element(elements=('Republican', 'Democrat')),
}
}
}
return vote
# List of states
states = ["Alabama", "Alaska", "Arizona", "Arkansas", "California", "Colorado",
"Connecticut", "Delaware", "Florida", "Georgia", "Hawaii", "Idaho", "Illinois",
"Indiana", "Iowa", "Kansas", "Kentucky", "Louisiana", "Maine", "Maryland",
"Massachusetts", "Michigan", "Minnesota", "Mississippi", "Missouri", "Montana",
"Nebraska", "Nevada", "New Hampshire", "New Jersey", "New Mexico", "New York",
"North Carolina", "North Dakota", "Ohio", "Oklahoma", "Oregon", "Pennsylvania",
"Rhode Island", "South Carolina", "South Dakota", "Tennessee", "Texas", "Utah",
"Vermont", "Virginia", "Washington", "West Virginia", "Wisconsin", "Wyoming",
"District of Columbia"]
# Generate fake voting data for each state and insert into MongoDB
for state in states:
for i in range(1, 61):
vote = generate_vote(state)
collection.insert_one(vote)
print(f"All votes saved to MongoDB")
对于每个州,综合数据基于预定义的概率模拟选民选择,以反映历史投票模式。这些数据由每个州的60个选民组成,是Spark Processing System的输入。
SPARK系统处理数据,确定每个状态下的获胜方。然后,它计算了每一方赢得的选票百分比。然后将这些关键信息馈入SMS通知系统,提醒媒体和具有实时选举结果的相关方。
version: '3.1'
services:
# ===================== #
# Apache Spark #
# ===================== #
spark:
image: bitnami/spark:3.3.1
environment:
- SPARK_MODE=master
ports:
- '8080:8080'
- '7077:7077'
volumes:
- ./data:/data
- ./src:/src
spark-worker:
image: bitnami/spark:3.3.1
environment:
- SPARK_MODE=worker
- SPARK_MASTER_URL=spark://spark:7077
- SPARK_WORKER_MEMORY=4G
- SPARK_EXECUTOR_MEMORY=4G
- SPARK_WORKER_CORES=4
ports:
- '8081:8081'
volumes:
- ./data:/data
- ./src:/src
# ===================== #
# MongoDB #
# ===================== #
mongo:
image: mongo:4.4
volumes:
- ./mongo:/data/db
ports:
- '27017:27017'
environment:
- MONGO_INITDB_ROOT_USERNAME=root
- MONGO_INITDB_ROOT_PASSWORD=example
mongo-express:
image: mongo-express
ports:
- '8091:8081'
environment:
- ME_CONFIG_MONGODB_ADMINUSERNAME=root
- ME_CONFIG_MONGODB_ADMINPASSWORD=example
- ME_CONFIG_MONGODB_SERVER=mongo
- ME_CONFIG_MONGODB_PORT=27017
使用Pyspark(作业1)进行数据处理
-
创建一个火花:该代码启动了Sparksession,这是任何Spark功能的切入点。启动时,它将连接到存储数据的MongoDB数据库。
-
加载数据:然后,代码将来自MongoDB的数据读取并将其加载到数据框架中,该数据框架是组织到命名列的数据的分布式集合。它与关系数据库中的表相似,并且可以通过类似的方式进行操纵。
-
数据处理:该代码从数据框架(表决的状态,党派和有效性)中选择相关字段,按州和一方分组,并计算每个州中每个方的有效票数。
-
查找获胜者:接下来,该代码在每个州都以最多的选票找到了该党。它是通过根据获得的选票数量在每个州内对各方进行排名,然后选择最高排名的选票数(即,票数最多的选票)。
。
-
计算百分比:随后计算每个获胜方获得的选票百分比。它通过将获胜方获得的选票数量除以该州的总投票,而乘以100。
-
写入结果:最后,该代码保存了结果,其中包括获胜方及其在每个州的投票百分比,回到MongoDB,但在一个名为“ election_results”的不同集合中。
本质上,该代码处理投票记录,以确定赢得每个州赢得最多投票的一方,并计算获奖方获得的该州的总投票的百分比。该分析可以清楚地了解选举中的投票分布。
from pyspark.sql import SparkSession
from pyspark.sql.functions import count, expr, col
from pyspark.sql.window import Window
from pyspark.sql import functions as F
# Create a SparkSession
spark = SparkSession.builder \
.appName('MongoDBIntegration') \
.config("spark.mongodb.input.uri", "mongodb://root:example@mongo:27017/admin.votes") \
.getOrCreate()
# Load the MongoDB data into a DataFrame
df = spark.read.format("mongo").load()
# Extract relevant fields and group by state and party
result = df.select(
df["voter.address.state"].alias("state"),
df["vote.voting_choice.party"].alias("party"),
df["vote.vote_valid"].alias("validity")
).where(col("validity") == "Yes").groupby("state", "party").agg(count("validity").alias("votes"))
# Find the party with the most votes in each state
winners = result.withColumn("rn", F.row_number().over(Window.partitionBy("state").orderBy(F.desc("votes")))).filter(col("rn") == 1).drop("rn")
# Calculate the percentage of victory
total_votes = result.groupby("state").agg(F.sum("votes").alias("total_votes"))
winners_with_percentage = winners.join(total_votes, "state").withColumn("percentage", (col("votes") / col("total_votes")) * 100)
# Write the result to MongoDB
winners_with_percentage.write.format("mongo").mode("overwrite").option("spark.mongodb.output.uri", "mongodb://root:example@mongo:27017/admin.election_results").save()
输出:
{
_id: ObjectId('64873b3df42ba41d32f3d1a6'),
state: 'Utah',
party: 'Republican',
votes: 127,
total_votes: 240,
percentage: 52.916666666666664
}
使用Pyspark(工作2)的数据处理
-
创建一个火花和负载数据:脚本启动SparkSession,然后从MongoDB集合中加载数据。
-
国家通过国家进行选举投票:美国使用一种名为选举学院的制度来决定总统选举的结果。每个州在选举学院都有许多选票,这些选票与其人口成正比。该脚本创建了一个词典,该字典将每个状态映射到其选举票数。然后将该字典转换为数据框。
-
与选举数据一起加入选举投票:脚本将选举结果数据与选举投票数据结合在一起,基于州名称。这为我们提供了一个数据框架,每行都有州名称,当事方,当事人收到的票数以及州拥有的选举票数。
-
计算全国性的投票:脚本计算全国各方收到的总票数。
-
确定全国冠军:脚本确定在全国范围内获得最多投票的当事方。
-
计算最大国家投票并处理关系:脚本通过向全国性的获胜者提供选举票来确定每个州收到的最大票数,并处理联系。
-
计算每个一方的总选举人:剧本随后计算全国各方的选举选票总数(大选),考虑到抢七局的规则。
。 -
保存结果:脚本将选举票保存回到MongoDB。
-
通过SMS通知结果:使用Twilio,在线消息服务,脚本将SMS发送带有选举结果的SMS。结果格式为字符串,其中包括每一方和他们赢得的选举票数。
-
停止火花:最后,脚本停止了火花,发布了资源。
来自pyspark.sql进口火花
从pyspark.sql.functions导入最大
从pyspark.sql导入为f
来自pyspark.sql.window导入窗口
来自Twilio.rest Import Clientspark = sparksession.builder \
.appName(“ aperectionResults”) 。 .getorCreate()df = spark.read.format(“ mongo”)。load()
按国家创建一个大选词典
electors_dict = {
“阿拉巴马州”:9,
“阿拉斯加”:3,
“亚利桑那”:11,
“阿肯色”:6,
“加利福尼亚”:55,
“科罗拉多州”:9,
“康涅狄格”:7,
“特拉华州”:3,
“佛罗里达”:29,
“佐治亚州”:16,
“夏威夷”:4,
“爱达荷州”:4,
“伊利诺伊州”:20,
“印第安纳州”:11,
“爱荷华州”:6,
“堪萨斯州”:6,
“肯塔基”:8,
“路易斯安那州”:8,
“缅因州”:4,
“马里兰州”:10,
“马萨诸塞州”:11,
“密歇根州”:16,
“明尼苏达州”:10,
“密西西比州”:6,部分 “密苏里州”:10,
“蒙大拿州”:3,
“内布拉斯加州”:5,
“内华达州”:6,
“新罕布什尔州”:4,
“新泽西”:14,
“新墨西哥”:5,
“纽约”:29,
“北卡罗来纳州”:15,
“北达科他州”:3,
“俄亥俄”:18,
“俄克拉荷马州”:“
“俄勒冈”:7,
“宾夕法尼亚州”:20,
“罗德岛”:4,
“南卡罗来纳州”:9,
“南达科他州”:3,
“田纳西州”:11,
“德克萨斯州”:38,
“犹他州”:6,
“佛蒙特州”:3,
“弗吉尼亚”:13,
“华盛顿”:12,
“西弗吉尼亚州”:5,
“威斯康星州”:10,
“怀俄明州”:3,
“哥伦比亚特区”:3
}将字典转换为数据框
epletors_df = spark.CreateDataFrame([[(k,v)用于epletors_dict.items()],[“状态”,“选举者”])
df = df.join(electors_df,on =“ state”,how =“ innit”)
nationwide_df = df.groupby(“ party”)。agg(f.sum(“ fotes”)。别名(“ total_votes”))
nationwide_winner = natifwide_df.orderby(f.desc(“ total_votes”))。first()[0]
确定每个州的最高选票
state_max_df = df.groupby(“ state”)。agg(max(“ fotes”)。别名(“ max_votes”))
df = df.join(state_max_df,on =“ state”,how =“ innit”)
window = window.partitionby(df ['state'])
来自pyspark.sql.functions导入
时df = df.withcolumn('winners',f.sum(当(df.votes == df.max_votes,1)。
df = df.withcolumn('final_party',当(df.winners> 1,nativewide_winner)。
result_df = df.groupby(“ final_party”)。sum(“选民”)
将结果保存到mongodb
result_df.write.format(“ mongo”)。option(“ uri”,“ mongodb:// root:example@mongo:27017/admin.election.election_results_out”).mode(“ operWrite”) /p>
Account_Sid =''
auth_token =''
client = client(account_sid,auth_token)结果= result_df.collect()
result_str =“ \ n” .join([f“ [f'final_party']}:{row ['sum(epletors)']} electors for Result in Relec
message_body = f“亲爱的收件人,\ n \ nwe很高兴与您分享最终的选举结果:\ n \ n {result_str} \ n \ nwe想对您的耐心和对我们民主的兴趣表示感谢流程。有关更详细的结果,请访问我们的官方网站。\ n \ nbest问:\ n [选举委员会]
消息= client.messages.create(
from_ ='',
身体= message_body,
to =''
)print(f“消息发送的消息{message.sid}”)
停止火花
spark.stop()
输出:
{
_id: ObjectId('6487445c358709227a7e9c71'),
final_party: 'Republican',
'sum(electors)': 201
}
{
_id: ObjectId('6487445c358709227a7e9c72'),
final_party: 'Democrat',
'sum(electors)': 337
}
结果通知
用破折号可视化
最后一步涉及使用Dash可视化结果,这是用于构建Web分析应用程序的富有成效的Python框架。它使我们能够构建一张美国的互动地图,每个州都根据赢得大多数选票的政党进行着色:民主党人的蓝色和共和党人的红色。这使用户可以轻松,直观地理解选举结果。
-
连接到数据库:脚本连接到存储选举结果的数据库(特别是mongodb)。
-
定义地理数据:脚本包含具有其纬度和经度坐标的状态列表。这些数据将有助于在地图上准确绘制每个状态。
-
创建缩写词典的状态名称:该词典用于将完整的状态名称映射到其缩写(例如纽约到纽约),因为该地图使用缩写。
-
设置应用程序:脚本使用名为Dash的框架设置应用程序,该应用程序有助于构建Interactive Web应用程序。
-
定义应用程序布局:应用程序的布局定义为包括图形元素(在这种情况下为映射)。
-
更新地图:定义一个函数,每次调用地图都会更新地图。此功能可以做一些事情:
-
a。获得选举结果:该功能从数据库中获取选举结果。
-
b。过程结果:它处理这些结果以提取必要的数据。对于每个州,它都会获得赢得胜利的一方以及该方获得的选票百分比。各方被指定为稍后颜色代码的数值(共和党为0,民主党为1)。
-
c。准备悬停文本:这是悬停在地图上的状态时出现的文本。它显示了赢得胜利的一方和他们获得的选票百分比。
-
d。创建地图:该功能创建了美国的地图,每个州都根据赢得的政党进行了颜色编码(民主党的蓝色,共和党人红色)。
-
e。添加传奇:传说被添加到地图中,以指示哪种颜色与哪个聚会相对应。
-
f。调整布局:最后,该功能调整了地图的布局并将其返回。该地图显示在Web应用程序中。
我希望本指南能使您更好地了解MongoDB,Pyspark,Twilio和Dash如何用于构建高效,高性能的数据管道。