We use pandas and psycopg2 together to connect with PostgreSQL. psycopg2 is a package allows us to create a connection with PostgreSQL databases in Python, and we will use sqlio within pandas to interact with the database.

If you haven't install the psycopg2 package, simply install it using pip install psycopg2 in terminal.

Database Connection

First we create a connection variable conn so we can call it later when we need it:

conn = psycopg2.connect(
            dbname="dbname", 
            user="username",
            password="password", 
            host="postgresql_host", 
            port="443"
        )

Store SQL Queries

Instead of writing SQL queries inside a function, we store them in python variables so we can call it in sql io functions:

sql_query = """
            SELECT * FROM your_table_name
            WHERE ds = "${bizdate}";
            """

Execute SQL Queries

Now we can use sqlio from pandas.io.sql to execute the SQL command we wrote. It will return a pandas DataFrame so we can simply assign a variable name to it:

df = sqlio.read_sql_query(sql_query.replace('${bizdate}',bizdate), conn)

Documentation of read_sql_query: https://pandas.pydata.org/pandas-docs/stable/reference/api/pandas.read_sql_query.html

Notice I used a .replace() method for the sql_query. Because often PostgreSQL is used in datawarehosue and the data is store with partition. With the replace of partition, we can execute the command in the correct partition. In my case, the partitions are named with a certain date format (20191225) of yesterday, to create the variable bizdate, use:

import datetime
bizdate = (datetime.date.today() - datetime.timedelta(1)).strftime('%Y%m%d')

Output DataFrame

After some data cleaning, we can export the DataFrame df to other form like .csv files or write it to other database like MySQL:

# export as csv files
df.to_csv('./csv/df.csv', index = False, encoding = 'utf-8')

# write to MySQL databases
from sqlalchemy import create_engine

host = 'mysql_host'
port = 3306
db = 'dbname'
user = 'username'
password = 'password'

engine = create_engine(str(r"mysql+mysqlconnector://%s:" + '%s' + "@%s/%s") % (user, password, host, db))

try:
    df.to_sql('your_target_table', con=engine, if_exists='replace', index=False)
    print('Sucess!')
except Exception as e:
    print(e)

Automation

Further, the data fetching process can be automated with crontab.

Complete Code

import datetime
import time
import psycopg2
import pandas.io.sql as sqlio
import pandas as pd


def get_data():		

	bizdate = (datetime.date.today() - datetime.timedelta(1)).strftime('%Y%m%d')
    print('Define bizdate = ' + bizdate)

	print('Create Database Connection')
    conn = psycopg2.connect(
                dbname="dbname", 
                user="username",
                password="password", 
                host="postgresql_host", 
                port="443"
            )	

    print('Read SQL Queries')
    sql_query = """
            SELECT * FROM your_table_name
            WHERE ds = "${bizdate}";
            """

    print('Fetch Data')
    df = sqlio.read_sql_query(sql_query.replace('${bizdate}',bizdate), conn)

    print('Export CSV Files')
    df.to_csv('./csv/weixin_xiaoxi.csv', index=False, encoding = 'utf-8')

    print('Close Database Connection')
    conn = None

if __name__ == "__main__":
    get_data()
    print('Finish!')
    now = time.strftime('%Y-%m-%d %H:%M:%S',time.localtime(time.time()))
    print(now)
    print('==================================')