Monday, August 10, 2015

HADOOP - Loading data via Hive

1. Background

After spending a few days getting my data into a MariaDB instance I realized that I wasn't going to be able to pull that data into my SAP HANA One instance. I looked into creating a generic ODBC connection but it appears that I am unable to log into the console of the OS to load any libraries I would need.

For the last day or so I have been trying to loading data into Hadoop. At first I tried using Sqoop since that has the ability to extract data from MariaDB. But I was spending too much time trying to figure out why Sqoop was saving the files to odd locations in Hadoop and not getting far on getting my data loaded. So I took a step back and used the create table/load functions in Hive to import what I need.

Hive provides SQL like access to data stored within the Hadoop file system. Once I load data, I can go back and use basic SQL statements to validate what was imported into HDFS.

First I need to go over some of the changes I made to the process.

2. Updating the Stock Market List

The original list of stock market symbols didn't include the name of the company or the country where the company is headquartered. So I updated the symbol extraction script to include these new fields. 
import string,io
import urllib2
from bs4 import BeautifulSoup

global f

def download_page(url):
aurl = urllib2.urlopen(url)
soup = BeautifulSoup(aurl.read(),"html.parser")

print url

for row in soup('table')[1]('tr'):
tds = row('td')
if (len(tds) > 0) :
f.write(tds[1].string + '\t')
f.write(tds[2].string + '\t')
f.write(tds[0].string or u"No Name")
f.write(u'\n')

f = io.open('stock_names.txt', 'w', encoding='utf8')
#f = open('stock_names.txt', 'w')

url_part1 = 'http://en.wikipedia.org/wiki/Companies_listed_on_the_New_York_Stock_Exchange_'
url = url_part1 + '(0-9)'
download_page(url)

for letter in string.uppercase[:26]:
url_part2 = letter
url = url_part1 + '(' + letter + ')'

download_page(url)

f.close()

Here are the lines I added:
f.write(tds[1].string + '\t')
f.write(tds[2].string + '\t')
f.write(tds[0].string or u"No Name")
f.write(u'\n')

This adds the country tds[2] and the name tds[1] to the list. If the name is blank, then enter the text “No Name”

When I'm done, the file looks like this (note the tab delimited format):
DDD USA 3D Systems Corporation
MMM USA 3M Company
WBAI USA 500.com
WUBA China 58.com Inc.
CAS USA A. M. Castle & Co.

This data is then easily loaded into Hadoop via this script that utilizes the Hive server (this is from an example in the Apache Hive documentation):
# Author...: J. Haynes
# Purpose..: Copy Stock Symbols file into Hive/Hadoop

import sys

from hive_service import ThriftHive
from hive_service.ttypes import HiveServerException
from thrift import Thrift
from thrift.transport import TSocket
from thrift.transport import TTransport
from thrift.protocol import TBinaryProtocol

try:
transport = TSocket.TSocket('localhost', 10000)
transport = TTransport.TBufferedTransport(transport)
protocol = TBinaryProtocol.TBinaryProtocol(transport)

client = ThriftHive.Client(protocol)
transport.open()
client.execute("CREATE TABLE IF NOT EXISTS zsym(Symbol STRING, Country STRING, SymName STRING) row format delimited fields terminated by '\t' stored as textfile")
client.execute("LOAD DATA LOCAL INPATH '/home/hadoopuser/stockdata/stock_names.txt' OVERWRITE INTO TABLE zsym")
client.execute("SELECT * FROM zsym")
while (1):
row = client.fetchOne()
if (row == None):
break
print row
client.execute("SELECT * FROM zsym")
print client.fetchAll()

transport.close()

except Thrift.TException, tx:
print '%s' % (tx.message)


This script utilizes the 'Thrift' server which is really just Hive running as a server on the system (the command to start is hive –-service hiveserver which comes up on port 10000).

The first step creates a table named 'zsym' and sets the incoming format so it reads tab delimited files. The second step completes the import and will remove any current data if it already exists.

After the data is loaded, I can go back into the Hive command shell and review the data. Here is a quick example:
hive> select * from zsym limit 20;
OK
DDD USA 3D Systems Corporation
MMM USA 3M Company
WBAI USA 500.com
WUBA China 58.com Inc.
CAS USA A. M. Castle & Co.
AOS USA A. O. Smith Corporation
ATEN USA A10 Networks, Inc.
AIR USA AAR Corporation

Here is another check on just the country field:
hive> select * from zsym where country = 'Switzerland';
OK
ABB Switzerland ABB LTD.
AWH Switzerland Allied World Assurance Company Holdings, AG
CS Switzerland Credit Suisse Group AG
MTD Switzerland Mettler Toledo International Inc.
NVS Switzerland Novartis AG


3. Updating the Stock Market History Data

On review, I also noticed that I wasn't pulling the full history for the stock market data. So I updated the original script to include a wider date range (the details are at https://code.google.com/p/yahoo-finance-managed/wiki/csvHistQuotesDownload)

Here is the updated script to pull that data:
import string,csv
import urllib2
from bs4 import BeautifulSoup

global f

# https://code.google.com/p/yahoo-finance-managed/wiki/csvHistQuotesDownload
url_part1 = 'http://ichart.finance.yahoo.com/table.csv?s='
url_part2 = '&d=8&e=1&f=2015&g=d&a=1&b=1&c=1995&ignore=.csv'

print "Starting"

f = open('stock_names.txt', 'r')
file_content = f.readlines()

count = 1;
print "About %d tickers will be downloaded" % len(file_content)

for row in file_content:
ticker, country, name= row.strip().split("\t")
url = url_part1 + ticker + url_part2
print "\nTrying-->" + url

try:
# This will cause exception on a 404
response = urllib2.urlopen(url)

print "Downloading ticker %s (%d out of %d)" % (ticker, count, len(file_content))

count = count + 1
history_file = open('history/' + ticker + '.csv', 'w')
history_file.write(response.read())
history_file.close()

except Exception, e:
pass

f.close()

The other issue I encountered is deals with how Hive can load data. When I run the script above it populates an entire directory with csv files that have the format shown here (also note the header line which we will correct for below):
Date,Open,High,Low,Close,Volume,Adj Close
2015-08-07,106.349998,107.110001,106.040001,106.620003,311800,106.620003
2015-08-06,109.370003,109.419998,105.940002,106.18,401400,106.18
2015-08-05,106.949997,109.540001,106.080002,109.370003,539800,109.370003

Hive has the ability to load all of the files in a given directory (a really handy feature). The problem is Hive doesn't know that each file represents a separate stock symbol. To fix this I wrote the script below which creates a new set of csv files with the symbol added to the beginning of each data line.
#!/usr/bin/env python
# Name.....: fix_dat.py
# Author...: J. Haynes
# Purpose..: Fix stock data files

import sys,glob,os

os.chdir("/home/hadoopuser/stockdata/history")
for file in glob.glob("*.csv"):
#print(file)

# open the text file for reading
f1 = open(file, 'r')
# Skip past the header in the file
next(f1)
newfilenm = 'new/' + file + "_new"
f2 = open(newfilenm, 'w')

# step through each line and add the symbol to the front
for row in f1:
#print row
symbol = os.path.splitext(file)
row = symbol[0] + ',' + row
#print row
f2.write(row)

print "Done!"

f1.close()
f2.close()


This script takes all of the files in the history directory and creates a new file with the stock symbol added. Specifically, it takes the name of the file, strips off the extension, and adds it to the beginning of each line like this (note that the header line from before was skipped with the 'next(f1)' command):
ESI,2015-08-07,2.99,3.45,2.98,3.02,567600,3.02
ESI,2015-08-06,3.00,3.09,2.92,3.00,520500,3.00
ESI,2015-08-05,3.00,3.08,2.95,3.00,487100,3.00
ESI,2015-08-04,3.01,3.06,2.82,3.02,834600,3.02

Now that an entire directory of files has been updated, it can be loaded into Hadoop with this script:

#!/usr/bin/env python
# Name.....: updt_dat.py
# Author...: J. Haynes
# Purpose..: Copy stock data files into Hive/Hadoop

import sys

from hive_service import ThriftHive
from hive_service.ttypes import HiveServerException
from thrift import Thrift
from thrift.transport import TSocket
from thrift.transport import TTransport
from thrift.protocol import TBinaryProtocol

# Example of the data to load
#Date,Open,High,Low,Close,Volume,Adj Close
#2010-01-28,6.38,6.38,6.12,6.30,61000,5.68551

try:
transport = TSocket.TSocket('localhost', 10000)
transport = TTransport.TBufferedTransport(transport)
protocol = TBinaryProtocol.TBinaryProtocol(transport)

client = ThriftHive.Client(protocol)
transport.open()
client.execute("DROP TABLE IF EXISTS zdat PURGE")
client.execute("CREATE TABLE IF NOT EXISTS zdat(symbol STRING, zDate DATE, Open DECIMAL(10,7), High DECIMAL(10,7),Low DECIMAL(10,7),Close DECIMAL(10,7),Volume DECIMAL(10,7),AdjClose DECIMAL(10,7)) row format delimited fields terminated by ',' stored as textfile")
client.execute("LOAD DATA LOCAL INPATH '/home/hadoopuser/stockdata/history/new' OVERWRITE INTO TABLE zdat")
client.execute("SELECT * FROM zdat")
while (1):
row = client.fetchOne()
if (row == None):
break
print row
client.execute("SELECT * FROM zdat")
print client.fetchAll()

transport.close()

except Thrift.TException, tx:
print '%s' % (tx.message)
Again, this connects to the Hive server (port 10000), creates table zdat, and loads the data from the files stored in the 'new' subdirectory on the local file system.

Testing the results back in the Hive shell shows this:

hive> select * from zdat where symbol='A' limit 3;
OK
A 2015-08-07 40.139999 40.209999 39.630001 39.990002 NULL 39.990002
A 2015-08-06 40.950001 40.959999 39.91 40.119999 NULL 40.119999
A 2015-08-05 40.939999 41.189999 40.689999 40.720001 NULL 40.720001

As you can see above, working with data via Hadoop and Hive takes a different mindset. The tools for manipulating the data are limited since records cannot be deleted or inserted. Luckily, Python and PERL are efficient at making changes to source files before they loaded.

Now that I have finished loading this historical data, I can go back and finish pulling the fundamental statistics. I'm finally getting closer to having usable data in Hadoop that I 'should' be able to pull into SAP HANA (crossing fingers).

No comments: