BigQuery:使用气流在MySQL和BigQuery之间创建管道
#python #codenewbie #airflow #bigquery

什么是bigquery?

BigQuery是一个完全管理的企业数据仓库,可提供机器学习,地理空间分析和商业智能等内置功能,以帮助您管理和分析数据。 BigQuery用于存储和分析数据。联合查询从外部来源读取数据,而流媒体允许连续数据更新。可以使用BigQuery ML和BI Engine等强大工具对这些数据进行分析和理解。 BigQuery使用柱状存储格式,该格式已优化用于存储数据的分析查询。 BigQuery在表,行和列格式中显示数据,并支持完整的数据库交易语义。

设置环境

注册到Google Cloud平台并创建一个项目,选择要在Google Cloud Console中使用的项目。这通常是Google Cloud Console Nav中的下拉菜单。为所选项目启用您的BigQuery API。创建一个服务帐户和IAM策略,允许您在项目中访问BigQuery。最好以JSON格式生成键。

从Python访问BigQuery

访问bigquery使用google-cloud-bigquery python库。创建客户端,使用给定的凭据并连接到bigquery。

bigquery数据集
在将数据加载到Biquery之前,使用create_dataset方法在BigQuery中创建一个数据集,将存储表/数据

从本地DB-MYSQL提取数据

MySQL包含一个经典模型DB,它是汽车零售公司的数据库。从数据库中我们可以提取以下内容。

  • 订单最多的客户
  • 购买数量最多的产品
  • 花费更多的客户。 然后使用熊猫提取并转换该数据。 airflow.hooks.mysql_hook import MySqlHook用于从气流连接到本地DB(MySQL)。将主机设置为host.docker.internal允许访问本地数据库。连接字符串/属性应在Admin/Connections菜单下的气流Web UI中设置
# connecting to local db to query classicmodels db
    mysql_hook = MySqlHook(mysql_conn_id='mysql_default', schema='classicmodels')
    connection = mysql_hook.get_conn()

MySql connection properties

插入BigQuery

client.load_table_from_dataframe(df,'table_name')是一种使用从Queries和bigQuery中的目标表创建的数据框中插入数据框的方法。

用气流自动化

工作每20分钟运行一次。 ETL分为三个任务creating_dataset >> truncating_tables >> inserting_data,这些任务是在气流中使用PythonVirtualenvOperator执行的

气流在Docker中运行。卷./data:/opt/airflow/data被添加到Docker-Compose文件中以存储classicmodels db和包含Google应用程序凭据的JSON文件。

dag的整个代码


import airflow
from airflow import DAG
from airflow.operators.python import PythonOperator # for executing python functions
from airflow.operators.python import PythonVirtualenvOperator # for working with venvs airflow
from airflow.hooks.mysql_hook import MySqlHook # for connecting to local db
from datetime import timedelta

# function to create a dataset in bigquery to store data
def create_dataset():
    from google.cloud import bigquery
    from google.cloud.exceptions import NotFound
    import os

    # setting application credentials to access biqguery
    os.environ['GOOGLE_APPLICATION_CREDENTIALS']= "data/introduction-to-gcp.json"
    client = bigquery.Client()

    """
    Create a dataset in Google BigQuery if it does not exist.
    :param dataset_id: Name of dataset
    :param region_name: Region name for data center, i.e. europe-west2 for London
    """
    dataset_id = 'dataset'
    region_name = 'europe-west2'

    reference = client.dataset(dataset_id)

    try:
        client.get_dataset(reference)
    except NotFound:
        dataset = bigquery.Dataset(reference)
        dataset.location = region_name

        dataset = client.create_dataset(dataset)


# function to truncate tables before inserting data
def truncate():
    from google.cloud import bigquery
    import os

    # setting application credentials to access biqguery
    os.environ['GOOGLE_APPLICATION_CREDENTIALS']= "data/introduction-to-gcp.json"

    # tables to truncate in biquery (*this service task is billed)
    table1 = 'dataset.product_dem'
    table2 = 'dataset.toporders'
    table3 = 'dataset.customer_spe'

    # Truncate a Google BigQuery table
    client = bigquery.Client()
    query1 = ("DELETE FROM "+ table1 +" WHERE 1=1")
    query2 = ("DELETE FROM "+ table2 +" WHERE 1=1")
    query3 = ("DELETE FROM "+ table3 +" WHERE 1=1")

    job_config = bigquery.QueryJobConfig(use_legacy_sql=False)
    query_job1 = client.query(query1, job_config=job_config)
    query_job2 = client.query(query2, job_config=job_config)
    query_job3 = client.query(query3, job_config=job_config)

# function to insert data from a pandas df to bigquery
def insert():
    from google.cloud import bigquery
    from airflow.hooks.mysql_hook import MySqlHook # for connecting to local db
    import pandas as pd
    import os

    #setting application credentials to access biqguery
    os.environ['GOOGLE_APPLICATION_CREDENTIALS']= "data/introduction-to-gcp.json"

    # connecting to local db to query classicmodels db
    mysql_hook = MySqlHook(mysql_conn_id='mysql_default', schema='classicmodels')
    connection = mysql_hook.get_conn()

    # querying the source db
    # products with highest number purchase
    query1 =""" 
    SELECT productName , SUM(quantityOrdered) AS quantity_ordered\
        FROM  products, orderdetails\
        WHERE products.productCode = orderdetails.productCode\
        GROUP BY productName\
        ORDER BY quantity_ordered DESC\
        LIMIT 20;
        """

    # customers that have made the most orders
    query2 = """
    SELECT contactFirstName, contactLastName , COUNT(*) AS number_of_orders\
        FROM  customers, orders\
        WHERE customers.customerNumber = orders.customerNumber\
        GROUP BY customerName\
        ORDER BY number_of_orders DESC\
        LIMIT 20;
        """

    # customers that have spent more
    query3 = """ 
    SELECT contactFirstName , contactLastName, SUM(quantityOrdered*priceEach) AS total_amount_spent\
        FROM  customers, orders, orderdetails\
        WHERE customers.customerNumber = orders.customerNumber AND orderdetails.orderNumber= orders.orderNumber\
        GROUP BY customerName\
        ORDER BY total_amount_spent DESC\
        LIMIT 10;
        """

    sql_query1 = pd.read_sql_query(query1, connection)
    sql_query2 = pd.read_sql_query(query2, connection)
    sql_query3 = pd.read_sql_query(query3, connection)
    df1 = pd.DataFrame(sql_query1)
    df2 = pd.DataFrame(sql_query2)
    df3 = pd.DataFrame(sql_query3)
    client = bigquery.Client()
    # load the data to bigquery tables
    client.load_table_from_dataframe(df1, 'dataset.product_dem')
    client.load_table_from_dataframe(df2, 'dataset.toporders')
    client.load_table_from_dataframe(df3, 'dataset.customer_spe')


def message():
    print("data successfully loaded into gc-bigquery")


default_args = {
 'owner':'airflow',
 'depends_on_past' : False,
 'start_date': airflow.utils.dates.days_ago(7),
}

mysql_to_gcp = DAG(
    'mysql-to-gcp', #name of the dag
    default_args = default_args,
    schedule_interval = timedelta(minutes=20),
    catchup = False
)


creating_dataset = PythonVirtualenvOperator(
    task_id='creating-a-dataset-in-bigquery',
    python_callable = create_dataset,
    requirements = ["google-cloud-bigquery","google-cloud-bigquery-storage"],
    system_site_packages=False,
    dag = mysql_to_gcp,
)

truncating_tables = PythonVirtualenvOperator(
    task_id='truncating-tables-in-bigquery',
    python_callable = truncate,
    requirements = ["google-cloud-bigquery","google-cloud-bigquery-storage"],
    dag = mysql_to_gcp,
)

inserting_data = PythonVirtualenvOperator(
    task_id='inserting-data-into-bigquery',
    python_callable = insert,
    requirements = ["google-cloud-bigquery","google-cloud-bigquery-storage"],
    dag = mysql_to_gcp,
)


message_out = PythonOperator(
    task_id = 'task-complete-message',
    python_callable = message,
    dag = mysql_to_gcp,
)

creating_dataset >> truncating_tables >> inserting_data >> message_out

气流图的整合

气流提供了各种操作员,例如气流BigQuery操作员,以帮助您管理数据。尤其是气流BigQuery操作员被广泛使用,因为它们通过分析和提取有意义的见解来帮助数据管理。您可以使用气流BigQuery操作员来执行以下操作:

  1. 控制数据集
  2. 表管理
  3. 运行BigQuery Job
  4. 验证数据