什么是bigquery?
BigQuery是一个完全管理的企业数据仓库,可提供机器学习,地理空间分析和商业智能等内置功能,以帮助您管理和分析数据。 BigQuery用于存储和分析数据。联合查询从外部来源读取数据,而流媒体允许连续数据更新。可以使用BigQuery ML和BI Engine等强大工具对这些数据进行分析和理解。 BigQuery使用柱状存储格式,该格式已优化用于存储数据的分析查询。 BigQuery在表,行和列格式中显示数据,并支持完整的数据库交易语义。
设置环境
注册到Google Cloud平台并创建一个项目,选择要在Google Cloud Console中使用的项目。这通常是Google Cloud Console Nav中的下拉菜单。为所选项目启用您的BigQuery API。创建一个服务帐户和IAM策略,允许您在项目中访问BigQuery。最好以JSON格式生成键。
从Python访问BigQuery
访问bigquery使用google-cloud-bigquery
python库。创建客户端,使用给定的凭据并连接到bigquery。
bigquery数据集
在将数据加载到Biquery之前,使用create_dataset
方法在BigQuery中创建一个数据集,将存储表/数据
从本地DB-MYSQL提取数据
MySQL包含一个经典模型DB,它是汽车零售公司的数据库。从数据库中我们可以提取以下内容。
- 订单最多的客户
- 购买数量最多的产品
- 花费更多的客户。
然后使用熊猫提取并转换该数据。
airflow.hooks.mysql_hook import MySqlHook
用于从气流连接到本地DB(MySQL)。将主机设置为host.docker.internal
允许访问本地数据库。连接字符串/属性应在Admin/Connections菜单下的气流Web UI中设置
# connecting to local db to query classicmodels db
mysql_hook = MySqlHook(mysql_conn_id='mysql_default', schema='classicmodels')
connection = mysql_hook.get_conn()
插入BigQuery
client.load_table_from_dataframe(df,'table_name')
是一种使用从Queries和bigQuery中的目标表创建的数据框中插入数据框的方法。
用气流自动化
工作每20分钟运行一次。 ETL分为三个任务creating_dataset >> truncating_tables >> inserting_data
,这些任务是在气流中使用PythonVirtualenvOperator
执行的
气流在Docker中运行。卷./data:/opt/airflow/data
被添加到Docker-Compose文件中以存储classicmodels db
和包含Google应用程序凭据的JSON文件。
dag的整个代码
import airflow
from airflow import DAG
from airflow.operators.python import PythonOperator # for executing python functions
from airflow.operators.python import PythonVirtualenvOperator # for working with venvs airflow
from airflow.hooks.mysql_hook import MySqlHook # for connecting to local db
from datetime import timedelta
# function to create a dataset in bigquery to store data
def create_dataset():
from google.cloud import bigquery
from google.cloud.exceptions import NotFound
import os
# setting application credentials to access biqguery
os.environ['GOOGLE_APPLICATION_CREDENTIALS']= "data/introduction-to-gcp.json"
client = bigquery.Client()
"""
Create a dataset in Google BigQuery if it does not exist.
:param dataset_id: Name of dataset
:param region_name: Region name for data center, i.e. europe-west2 for London
"""
dataset_id = 'dataset'
region_name = 'europe-west2'
reference = client.dataset(dataset_id)
try:
client.get_dataset(reference)
except NotFound:
dataset = bigquery.Dataset(reference)
dataset.location = region_name
dataset = client.create_dataset(dataset)
# function to truncate tables before inserting data
def truncate():
from google.cloud import bigquery
import os
# setting application credentials to access biqguery
os.environ['GOOGLE_APPLICATION_CREDENTIALS']= "data/introduction-to-gcp.json"
# tables to truncate in biquery (*this service task is billed)
table1 = 'dataset.product_dem'
table2 = 'dataset.toporders'
table3 = 'dataset.customer_spe'
# Truncate a Google BigQuery table
client = bigquery.Client()
query1 = ("DELETE FROM "+ table1 +" WHERE 1=1")
query2 = ("DELETE FROM "+ table2 +" WHERE 1=1")
query3 = ("DELETE FROM "+ table3 +" WHERE 1=1")
job_config = bigquery.QueryJobConfig(use_legacy_sql=False)
query_job1 = client.query(query1, job_config=job_config)
query_job2 = client.query(query2, job_config=job_config)
query_job3 = client.query(query3, job_config=job_config)
# function to insert data from a pandas df to bigquery
def insert():
from google.cloud import bigquery
from airflow.hooks.mysql_hook import MySqlHook # for connecting to local db
import pandas as pd
import os
#setting application credentials to access biqguery
os.environ['GOOGLE_APPLICATION_CREDENTIALS']= "data/introduction-to-gcp.json"
# connecting to local db to query classicmodels db
mysql_hook = MySqlHook(mysql_conn_id='mysql_default', schema='classicmodels')
connection = mysql_hook.get_conn()
# querying the source db
# products with highest number purchase
query1 ="""
SELECT productName , SUM(quantityOrdered) AS quantity_ordered\
FROM products, orderdetails\
WHERE products.productCode = orderdetails.productCode\
GROUP BY productName\
ORDER BY quantity_ordered DESC\
LIMIT 20;
"""
# customers that have made the most orders
query2 = """
SELECT contactFirstName, contactLastName , COUNT(*) AS number_of_orders\
FROM customers, orders\
WHERE customers.customerNumber = orders.customerNumber\
GROUP BY customerName\
ORDER BY number_of_orders DESC\
LIMIT 20;
"""
# customers that have spent more
query3 = """
SELECT contactFirstName , contactLastName, SUM(quantityOrdered*priceEach) AS total_amount_spent\
FROM customers, orders, orderdetails\
WHERE customers.customerNumber = orders.customerNumber AND orderdetails.orderNumber= orders.orderNumber\
GROUP BY customerName\
ORDER BY total_amount_spent DESC\
LIMIT 10;
"""
sql_query1 = pd.read_sql_query(query1, connection)
sql_query2 = pd.read_sql_query(query2, connection)
sql_query3 = pd.read_sql_query(query3, connection)
df1 = pd.DataFrame(sql_query1)
df2 = pd.DataFrame(sql_query2)
df3 = pd.DataFrame(sql_query3)
client = bigquery.Client()
# load the data to bigquery tables
client.load_table_from_dataframe(df1, 'dataset.product_dem')
client.load_table_from_dataframe(df2, 'dataset.toporders')
client.load_table_from_dataframe(df3, 'dataset.customer_spe')
def message():
print("data successfully loaded into gc-bigquery")
default_args = {
'owner':'airflow',
'depends_on_past' : False,
'start_date': airflow.utils.dates.days_ago(7),
}
mysql_to_gcp = DAG(
'mysql-to-gcp', #name of the dag
default_args = default_args,
schedule_interval = timedelta(minutes=20),
catchup = False
)
creating_dataset = PythonVirtualenvOperator(
task_id='creating-a-dataset-in-bigquery',
python_callable = create_dataset,
requirements = ["google-cloud-bigquery","google-cloud-bigquery-storage"],
system_site_packages=False,
dag = mysql_to_gcp,
)
truncating_tables = PythonVirtualenvOperator(
task_id='truncating-tables-in-bigquery',
python_callable = truncate,
requirements = ["google-cloud-bigquery","google-cloud-bigquery-storage"],
dag = mysql_to_gcp,
)
inserting_data = PythonVirtualenvOperator(
task_id='inserting-data-into-bigquery',
python_callable = insert,
requirements = ["google-cloud-bigquery","google-cloud-bigquery-storage"],
dag = mysql_to_gcp,
)
message_out = PythonOperator(
task_id = 'task-complete-message',
python_callable = message,
dag = mysql_to_gcp,
)
creating_dataset >> truncating_tables >> inserting_data >> message_out
气流图的整合
气流提供了各种操作员,例如气流BigQuery操作员,以帮助您管理数据。尤其是气流BigQuery操作员被广泛使用,因为它们通过分析和提取有意义的见解来帮助数据管理。您可以使用气流BigQuery操作员来执行以下操作:
- 控制数据集
- 表管理
- 运行BigQuery Job
- 验证数据