I built an E(T)L-pipeline in Python to transfer data from MS SQL Server to PostgreSQL
Introduction
Motivation
For another article (which is still in the making) I required good sample data. The latest AdventureWorks database (2019’s DW-version) from MS SQL Server was a good choice in my opinion, but I struggled to connect it to some BI tool. Instead of jumping to another sample data of a system I knew that would work (e.g. Airline bookings for Postgres), I liked the idea to write my own ETL-job (Extract, Transform, Load) that shifts the whole data to a separate PostgreSQL database. I am only doing some minor transformation, hence the ‘T’ in the title in brackets.
A core feature makes use of the to_sql() function, provided by Pandas. It lets you upload a full DataFrame to a database and it converts the headers to field names automatically. You skip a lot of additional coding with this, but it comes at some cost. I will describe those at the end of this article.
You can find the code hosted on GitHub here. I also created a detailed tutorial in an IPython notebook.
Please note that the details about the code posted here on Medium (chapter 1) and in the tutorial notebook are almost identical. You may skip chapter 1 if you read the notebook.
The setup
- Operating system: Windows 10 (this will be relevant, as you might see in the pitfalls section)
- MS SQL Server with the AdventureWorksDW2019 sample database set up
- PostgreSQL with an empty database “adventureworksdw2019” created
- Python and Jupyter Notebooks, installed via Anaconda
Structure of this article
- Python code
There are 3 blocks to cover: The required libraries and how to connect to the databases. How to get table and schema information in the source database. And how to load all information from the source to the target database.
I will describe the proceeding for each block first and then display the code. - Encoding pitfalls (WIN1252 and UTF-8)
There weren’t many obstacles to overcome in this project, but this one was big. - Critical appraisal
Is it any good? Maybe. If you didn’t find out by the code, you may make a find here. - (Presumably) Frequently asked questions
Some questions might arise after examining the code. I’ll cover those in this chapter. - Conclusion
Final words about this project.
1. Python code
The drafting idea is simple:
- Connect to the source database and get schema and table info
- Connect to target database and write all the source tables into the target with respect to the schemas
And you will see that the process is actually straight forward. Just a few additional steps are necessary.
And now, let’s get started.
Python libraries and how to connect to the databases
Relevant libraries
- We import pandas, because we will create a DataFrame and use the function to_sql() to load the data to our target database.
- We’ll need pyodbc to connect to MS SQL Server.
- For credentials, which are stored in Environment Variables, we’ll make use of the os library.
- Regex, for some basic transformation.
- And lastly, create_engine by sqlalchemy to create our connection objects.
Creating engines
To connect to our databases, we’re creating the database engines via sqlalchemy. Credentials are taken from our Environment Variables and commonly shared database information (database name or server name) are stored in separate variables for convenience.
To create the engine, we’re storing the database-URI for each database in a separate variable. The URI for sqlalchemy has the following structure: ‘database_system+connection_driver://username:password@host:port/database_name’
(In case of MS SQL Server you will have to add the driver at the end of the string: ‘?driver=SQL_Server_Driver’)
I personally like to work with f-strings. They provide a very flexible way for inserting your expressions into any string. Just like this:
expression = 'WORLD'
print(f"Hello, {expression.lower()}!")>>> Hello, world!
They work with single quotation marks as well but I recommend using the doubles, b/c you may want to use the singles for other purposes in the same string. Please keep in mind hat those f-strings are only available from Python 3.
How to get table and schema information in the source database
To automatically load our data into Postgres, we need information about tables and schemas from our source. As you can see, the first variable stores a SQL query string which we’ll use to retrieve the table, schema and view names of the original AdventureWorksDW2019 database.
We are achieving this by concatenating queries for both tables and views with the UNION operator (lines 1–21). A simple JOIN of the schema will not suffice, b/c then we’d have to use a more complicated way to get our table and view names in a single field.
Side-note: This query is crafted to MS SQL Server. But you should be able to adjust it to any other source-database as well. If you do so, make sure to have a result set with exactly 2 fields, whereas the first field holds the tables and the second field holds the schemas. The simple reason for this is, that we’re gonna store the info in a dictionary.
The next step is to create a connection to SQL Server (line 23) by assigning a new variable and call the connect() method on it. This will create a connection object, which allows to start querying.
Then we start the query defined before by calling the execute() method on our connection object and assign the result to a newly created variable (line 25).
That is an object as well. The fetchall() method stores the result set (line 26) and gets a list of tuples. I recommend to transform this list into a dictionary (line 27), which gives more flexibility when building the tables and schemas in the target database. Our desired dict will eventually look like this:
{'AdventureWorksDWBuildVersion': 'dbo',
'DatabaseLog': 'dbo',
'DimAccount': 'dbo',
...}
Finally, we are storing the schema names in a separate variable and eliminate duplicates simply by casting the values into a set (line 29). Sure, we just have ‘dbo’ as our only schema, but keep in mind that this is not the case for regular relational databases. Hence, it is good practice to consider the schemas, regardless of the system.
To finish this process we close the connection (line 31).
We now have a dict with table name as key and schema name as value. And we have a set of schema names.
How to load all information from the source to the target database
We start the final block with a for-loop in which we iterate through every table in our dict (line 1).
Then we start a connection to both SQL Server and Postgres (lines 3 and 4).
Next, let me draw your attention to lines 6, 7 and 8, regarding a variable called ‘table_split’. What are we doing here and why are we doing it?
I’ll start with a short example: One table in the AW database is called DimDate. To query the table later in Postgres, we must put its name in double quotes (this applies to fields as well):
SELECT "DateKey"
, "FullDateAlternateKey"
, "DayNumberOfWeek"
FROM "dbo"."DimDate"
This is a major inconvenience in my opinion, because as a Data Warehouse Engineer or Data Analyst you don’t want to wrap every object into quotes. To avoid that, the names must be lower-case in Postgres. But then, the table would be named dimdate, which I personally don’t like that much because I want the table’s attribute (the dim prefix) to be visually split from the actual name (but that’s just a personal taste). The simplest way is to add an underscore. Therefore, DimDate becomes dim_date.
We achieve this by using regex and split the name just before each uppercase letter (line 6). If DimDate is passed as table_name, the table_split variable becomes a list:
table_split = [t for t in re.split("([A-Z][^A-Z]*)", table_name) if t]>>> ['Dim', 'Date']
We join this list with an underscore (line 7) and get a string:
table_split = '_'.join(table_split)>>> 'Dim_Date'
And finally, we turn the string into lowercase (line 8):
table_split = table_split.lower()>>> 'dim_date'
This is the minor transformation I mentioned at the beginning. The actual values of our result sets remain untouched and we’re not doing any other transformation work, hence it’s not a ‘full’ transformation as in ‘ETL’.
We’re now generating a new query string called full_table to retrieve all the data of the currently processed table. Again, an f-string makes life easier and combined with the dict, we can insert the schema and table name just as easily (lines 10–16).
Storing this query in a Pandas DataFrame is the next step (lines 18). We call the read_sql() function and pass the above mentioned query string and the MS SQL Server connection to it. Our result set is now stored as a regular DataFrame (table DimDate):
DateKey | FullDateAlternateKey | DayNumberOfWeek | ...
---------+----------------------+-----------------+----
20050101 | 2005-01-01 | 7 | ...
20050102 | 2005-01-02 | 1 | ...
20050103 | 2005-01-03 | 2 | ...
20050104 | 2005-01-04 | 3 | ...
20050105 | 2005-01-05 | 4 | ...
... | ... | ... | ...
The columns still contain uppercase letters, so let’s ensure that those are lowercase (line 19).
Finally, we call the to_sql() function on our DataFrame (line 20). The parameters we’re using are as follows:
- schema: Recall, that the value of our current dict-element is the schema-name. So insert it here, but since we did not transform the field names like the table names, we must append the lower()-method to the string.
We don’t need to create or remove existing schemas. to_sql() handles this own its own. - name: Here we’re inserting the transformed table name, which is stored in the variable table_split.
- con: to_sql() can take an engine object or a connection object from sqlalchemy. I like consistency so I pass the connection our Postgres database.
- chunksize: Takes an integer as argument. Here you define the maximum number of rows processed at each iteration of to_sql()*. Although optional, I recommend to make use of this to keep the consumed ressources of your machine low.
- index: We did not set an index in the DataFrame so False, b/c default is True.
- index_label: See index.
- if_exists: What should happen, if this table already exists? Since we did not craft an incremental load solution, we must replace the data every time when this function is called. Otherwise it will append the same data again and then we’ll have lots of duplicates. This can happen if you run the code several times.
After our to_sql() function is finished, our coding job is basically done. We just have to ensure that after each iteration in our for loop, the existing connections are closed (lines 22 and 23). And once all tables are dumped to Postgres (i.e. the for loop has finished), we must dispose the engines which we created at the very beginning (lines 26 and 27).
2. Encoding pitfalls (WIN1252 and UTF-8)
Let’s talk about obstacles.
UnicodeEncodeError: 'charmap' codec can't encode characters in position 0-61: character maps to <undefined>
If you’ve come across this error, I feel you. This gave me a lot of headaches because the data loads just fine, until at some table it suddenly throws this error. So what’s the problem here?
I am running Postgres on Windows (using the Powershell with psql) and you can see my locales in the table below:
List of databases
Name | Owner | Encoding | Collate
----------------------+----------+----------+--------------------
adventureworksdw2019 | Chris So | WIN1252 | German_Germany.1252
postgres | Chris So | WIN1252 | German_Germany.1252
template0 | Chris So | WIN1252 | German_Germany.1252
template1 | Chris So | WIN1252 | German_Germany.1252
All databases have a WIN encoding, which comes out of the box with Microsoft products. The basic problem is that the AdventureWorks database contains data with symbols that WIN1252 would display just fine, but have no proper mapping in UTF-8.
What to do? We could set the encoding when creating the engine in Python, right? Just like this:
engine = create_engine(DATABASE_URI, encoding=”cp1252")
Nope, this won’t work.
I am not absolutely sure, but I think to_sql() converts to UTF-8 automatically (pls correct me if I’m wrong!). Hence, every time this function was called, it tried to write UTF-8 encoded tables to a WIN1252 database. The function has no encoding parameter, so I forced the change in my database with the following SQL-command (a related question can be found on StackOverflow):
UPDATE pg_database
SET ENCODING = pg_char_to_encoding('UTF8')
WHERE datname = 'adventureworksdw2019';
The client encoding is not relevant here — I had to change the actual database encoding.
And this worked fine, but I don’t recommend changing the encoding of your database if you already have a large amount of data stored. In this case, maybe creating an all new DB with the correct encoding from the ground up is your way to go to avoid broken data.
3. Critical appraisal
Scalability
Is it scalable? To some degree, I guess. From my professional work I know how long it can take to load big data into a Data Warehouse. The drawback here is that I do a full refresh every time the code runs, which will result in long loading times when millions of rows must be written.
Additionally, if you let to_sql() upload all rows at once (i.e. don’t add a chunksize argument to the function call), your RAM can be consumed quickly. If this code runs on a server, you might not want to allocate all of the available memory.
In my humble opinion incremental loading is a must if you want to ensure a fast running process.
And we make don’t make use of object oriented programming. This can become problematic if the project gets bigger and more complex (but the imperative approach is totally fine here).
No suitable error-handling
Quick implementation comes at the cost of stability. Exceptions are not caught, so if you come across any of them that are out of the scope of this article, you will have to find an individual solution.
Just for ad-hoc-loading
I make heavy use of print statements since I worked in Jupyter Notebooks for this project and wanted quick responses in my prototyping work. As you can see, those messages inform me about several steps which helped at debugging (yeah I know, it’s not best practice, but bear with me here). If this code is considered for production, the print statements should be replaced by scripts that write the state into a logfile and I would make use of pdb.
The mini-transformation isn’t that great
I must admit, as good as this works for table names like DimDate (recall, that this will become dim_date), it works just as bad for table names like AdventureWorksDWBuildVersion. This will eventually become adventure_works_d_w_build_version, which is atrocious for querying.
Either such tables should be handled differently or, more simply, the underscore should just not be added at all. I’ll let you decide which way to go.
4. (Presumably) Frequently asked questions
Why didn’t you include real transformation?
to_sql() takes a ‘dtype’ argument, but it must be passed as a dict for each field. Therefore one must identify the datatype either manually or an extra function must be written, which is additional work I wanted to avoid. Therefore the steps are actually limited to Extract and Load.
Anyway, date-strings can be quickly converted with SQL once you’re querying in Postgres.
But if this was an actual production environment, I would add dbt to the whole process which makes transformation life a lot easier.
The only transformation-like step is the renaming of the tables and fields for easy querying. But dbt can handle this as well, so in future cases I would concentrate on loading raw data into the target-database and transform it at the end (which, by the way, is called ‘ELT’ — in case you didn’t already know that).
Why did you not use a scheduling-tool like Airflow or Luigi?
There is not that many data to load, the process is not complicated and it is a one-time job. So no need to bring the big guns in.
The code could use some improvements, right?
Absolutely. First, my print statements are all over the place (or no state is written at all — it depends, if you’re looking at the embedded code here or at my notebook). Additionally, I am assigning variables just for those print statements, hence allocating additional resources. Those must be eliminated.
I am in favor of implementing some logging, which is stored in a separate database.
Second, adding a configuration file that stores all credentials outside the code would be a good idea.
There is already an AdventureWorks database for Postgres (found here), why did you not use that?
The above mentioned Postgres-version is OLTP-based. For my other article I needed a reporting database (OLAP, i.e. the DW-version with Dim and Fact tables). But I tested it with the AdventureWorks2019 OLTP version and it works just as fine. That’s because I already had the original schemas copied as well and those are crucial in the OLTP-version.
5. Conclusion
In this article I introduced you to an approach for extracting and loading raw data from SQL Server to Postgres, using Python 3 and Jupyter Notebooks. I did not transform the data because I wanted a simple solution. Most transformation concerns date-type fields which are currently a string but can be cast into a date when querying.
You might have noticed that the script itself is rather simple. This is true, but the real challenge was the error-free loading between the two database systems. If you encounter an obstacle which we handled in this small project, you might come across it again in a production environment. In this case you will be prepared and have the knowledge overcome it much faster.
So, who is this solution for?
One-time dumps of databases are suitable for this job — if you need a quick transfer from SQL Server to Postgres, this solution might be for you. Additionally, once you know how the connectors work, you are not restricted to those DB systems alone.
It is meant to dump a full database into another. If you want an incremental loading or repeated loading several times a day, this solution might not be your first choice.
It was fun putting this code together and I personally learned a lot about MS SQL Server, Postgres, connectors and encoding. Finally, I would recommend every aspiring Data Engineer to write his/her own EL-code to learn how the connectors and the databases work and what pitfalls might come up.