Ingest Data from RDS MySQL to Google BigQuery

In analytics, where queries over hundreds of gigabytes are the norm, performance is paramount and has a direct effect on the productivity of your team: running a query for hours means days of iterations between business questions. At Foxintelligence, we needed to move from traditional relational databases, like Postgres and MySQL to columnar database solutions. While RDBS like MySQL is great for normal transactional operations, it has significant drawbacks when it comes to real-time analytics on large amount of data. We found Google BigQuery to deliver superior results significantly for usability, performance, and cost for almost all our analytical use-cases, especially at scale.

Both Amazon RedShift and Google BigQuery provide much of the same functionalities, there are some fundamental differences between how these two operate. So you need to pick the right solution based on your data and business.

Once we decided which data warehouse we will use, we had to replicate data from RDS MySQL to Google BigQuery. This post walks you through the process of creating a data pipeline to achieve the replication between the two systems.

We used AWS Data Pipeline to export data from MySQL and feed it to BigQuery. The figure below summarises the entire workflow:



The pipeline starts based on a defined schedule and period, it launches a spot instance that will copy data from MySQL database to CSV files (split by table name) to an Amazon S3 bucket and then sending an Amazon SNS notification after the copy activity completes successfully. Following is our pipeline that accomplishes that:



Once the pipeline is finished, CSV files will be generated in the output S3 bucket:



The SNS notification will trigger a Lambda function, it will deploy a batch job based on a Docker image stored on our private Docker registry. The container will upload CSV files from S3 to GCS and load data to BigQuery:

You can use Storage Transfer Service to easily migrate your data from Amazon S3 to Cloud Storage.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
#!/bin/bash

echo "Download BigQuery Credentials"

aws s3 cp s3://$GCP_AUTH_BUCKET/auth.json .

echo "Upload CSV to GCS"

mkdir -p csv
rm tables

for raw in $(aws s3 ls s3://$S3_BUCKET/ | awk -F " " '{print $2}');
do
table=${raw%/}
if [[ $table != "" && $table != df* ]]
then
echo "Table: $table"
csv=$(aws s3 ls s3://$S3_BUCKET/$table/ | awk -F " " '{print $4}' | grep ^ | sort -r | head -n1)

echo $table >> tables

echo "CSV: $csv"

echo "Copy csv from S3"
aws s3 cp s3://$S3_BUCKET/$table/$csv csv/$table.csv

echo "Upload csv to GCP"
gsutil cp csv/$table.csv gs://$GS_BUCKET/$table.csv
fi
done

echo "Import CSV to BigQuery"

python app.py

We have written a Python script to clean up raw data (encoding issues), transform (map MySQL data types to BQ data types) and load CSV file to BigQuery:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
import mysql.connector
import os
import time
from mysql.connector import Error
from google.cloud import bigquery

bigquery_client = bigquery.Client()

def mapToBigQueryDataType(columnType):
if columnType.startswith('int'):
return 'INT64'
if columnType.startswith('varchar'):
return 'STRING'
if columnType.startswith('decimal'):
return 'FLOAT64'
if columnType.startswith('datetime'):
return 'DATETIME'
if columnType.startswith('text'):
return 'STRING'
if columnType.startswith('date'):
return 'DATE'
if columnType.startswith('time'):
return 'TIME'

def wait_for_job(job):
while True:
job.reload()
if job.state == 'DONE':
if job.error_result:
raise RuntimeError(job.errors)
return
time.sleep(1)

try:
conn = mysql.connector.connect(host=os.environ['MYSQL_HOST'],
database=os.environ['MYSQL_DB'],
user=os.environ['MYSQL_USER'],
password=os.environ['MYSQL_PWD'])
if conn.is_connected():
print('Connected to MySQL database')

lines = open('tables').read().split("\n")
for tableName in lines:
print('Table:',tableName)

cursor = conn.cursor()
cursor.execute('SHOW FIELDS FROM '+os.environ['MYSQL_DB']+'.'+tableName)

rows = cursor.fetchall()

schema = []
for row in rows:
schema.append(bigquery.SchemaField(row[0].replace('\'', ''), mapToBigQueryDataType(row[1])))


job_config = bigquery.LoadJobConfig()
job_config.source_format = bigquery.SourceFormat.CSV
job_config.autodetect = True
job_config.max_bad_records = 2
job_config.allow_quoted_newlines = True
job_config.schema = schema

job = bigquery_client.load_table_from_uri(
'gs://'+os.environ['GCE_BUCKET']+'/'+tableName+'.csv',
bigquery_client.dataset(os.environ['BQ_DATASET']).table(tableName),
location=os.environ['BQ_LOCATION'],
job_config=job_config)

print('Loading data to BigQuery:', tableName)

wait_for_job(job)


print('Loaded {} rows into {}:{}.'.format(
job.output_rows, os.environ['BQ_DATASET'], tableName))

except Error as e:
print(e)
finally:
conn.close()

As a result, the tables will be imported to BigQuery:



While this solution worked like a charm, we didn’t stop there. Google Cloud announced the public beta release of BigQuery Data Transfer. This service allows you to automates data movement from multiple data sources like S3 or GCS to BigQuery on a scheduled, managed basis. So it was a great use case to test this service to manage recurring load jobs from Amazon S3 into BigQuery as shown in the figure below:



This services comes with some trade-offs such as Google BigQuery cannot create tables as part of data transfer process. Hence, a Lambda function was used to drop the old dataset, and create the destination tables and their schema in advance of running the transfer. The function handler code is self-explanatory:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
func handler(ctx context.Context) error {
client, err := bigquery.NewClient(ctx, os.Getenv("PROJECT_ID"))
if err != nil {
return err
}

err = RemoveDataSet(client)
if err != nil {
return err
}

err = CreateDataSet(client)
if err != nil {
return err
}

uri := fmt.Sprintf("%s:%s@tcp(%s)/%s",
os.Getenv("MYSQL_USERNAME"), os.Getenv("MYSQL_PASSWORD"),
os.Getenv("MYSQL_HOST"), os.Getenv("MYSQL_DATABASE"))

db, err := sql.Open("mysql", uri)
if err != nil {
return err
}

file, err := os.Open("tables")
if err != nil {
return err
}
defer file.Close()

scanner := bufio.NewScanner(file)
for scanner.Scan() {
tableName := scanner.Text()
fmt.Println("Table:", tableName)

columns, _ := GetColumns(tableName, db)
fmt.Println("Columns:", columns)

CreateBQTable(tableName, columns, client)
}

if err := scanner.Err(); err != nil {
return err
}
return nil
}

func main() {
lambda.Start(handler)
}

The function will be triggered by a CloudWatch Event, once the data pipeline finishes exporting CSV files:



Finally, we created Transfer jobs for each table on BigQuery to load data from S3 bucket to BigQuery table:



Using Google BigQuery to store internally hundreds of gigabytes of data (soon terabytes) with the capability to analyse it in few seconds give us a massive push toward business intelligence and data-driven insights.

Like what you’re read­ing? Check out my book and learn how to build, secure, deploy and manage production-ready Serverless applications in Golang with AWS Lambda.

Drop your comments, feedback, or suggestions below — or connect with me directly on Twitter @mlabouardy.

Your browser is out-of-date!

Update your browser to view this website correctly. Update my browser now

×