Tuesday, August 11, 2015

HADOOP - Loading data via Hive Part Deux

The final set of data I need to load is the fundamental measures for each company. I updated the python script to now create a text file instead of loading directly into MariaDB. There is quite a bit of web scraping going on in this script so it is a bit longer than the others (not to mention that I'm not really a Python hacker so my code is more verbose than it needs to be).
 
# Name.....: getb.py
# Author...: J. Haynes
# Purpose..: Generate a list of book values

# Load any needed libraries
import os,time
import string
import urllib2
from bs4 import BeautifulSoup

# For now this is a test URL
#url = "http://finance.yahoo.com/q/ks?s=GOOG+Key+Statistics"

print "Starting"

f1 = open('/home/hadoopuser/stockdata/stock_names.txt', 'r')
file_content = f1.readlines()
foutput = '/home/hadoopuser/stockdata/history/fundamentals/FValues_' + time.strftime("%Y%m%d") + '.txt'
f2 = open(foutput, 'w')

# Get the total number of rows in the table
#numrows = int (file_content.rowcount)
print "About %d tickers will be downloaded" % len(file_content)

# step through each symbol and generate the URL for the extraction
for row in file_content:
ticker, country, name= row.strip().split("\t")
url = "http://finance.yahoo.com/q/ks?s=" + ticker + "+Key+Statistics"

# Open the URL and parse
aurl = urllib2.urlopen(url)
soup = BeautifulSoup(aurl.read(),"html.parser")
print "Pulling Ticker-->" + ticker

# Create lists of data parsed by using BeautifulSoup
head1 = soup.find_all("td", class_="yfnc_tablehead1")
data1 = soup.find_all("td", class_="yfnc_tabledata1")

# Create a list containing td header content
hlist = [""]
for row in head1:
headline = row.contents[0]
#print "Headline->" + headline
hlist.append(headline)

# Create a list containing td data content
dlist = [""]
for row in data1:
dataline = row.string
dlist.append(dataline)

# Create a list containing td data content
dlist = [""]
for row in data1:
dataline = row.string
dlist.append(dataline)

# Display what has been collected for this symbol and insert into the file
##print "Number of records to add to the table is:"
##print len(hlist) - 1
j=1
while j < len(hlist):
if hlist[j] is not None:
f2.write(ticker + '\t' + time.strftime("%Y%m%d") + '\t' + hlist[j] + '\t' + dlist[j] + '\n')

stmt = ""
j += 1

time.sleep(1)

f1.close()
f2.close()

The script above steps through the list of companies compiled previously into the file stock_names.txt and pulls the fundamental metrics from a page at Yahoo. It uses Beautifulsoup4 to extract data from each row in a table (tr's) based on the class for each table cell (td's). The first class (yfnc_tablehead1) contains the description of the metric and the second (yfnc_tabledata1) contains the results for the metric. Each of these is added to a list (hlist and dlist) and then the script steps through the contents of the first list to write out a line to a text file.

Once all of that data is collected the data needs to be uploaded into HDFS using Hive. I use another Python script to call the Hive-Thrift server (again, this is a script I based on the documentation on the Apache Hive site).
#!/usr/bin/env python
# Name.....: updt_fund.py
# Author...: J. Haynes
# Purpose..: Copy fundamental information into Hive/Hadoop

import sys

from hive_service import ThriftHive
from hive_service.ttypes import HiveServerException
from thrift import Thrift
from thrift.transport import TSocket
from thrift.transport import TTransport
from thrift.protocol import TBinaryProtocol

# Example of the data to load
#XYZ 20150810 Market Cap (intraday) 1.64B

try:
transport = TSocket.TSocket('localhost', 10000)
transport = TTransport.TBufferedTransport(transport)
protocol = TBinaryProtocol.TBinaryProtocol(transport)

client = ThriftHive.Client(protocol)
transport.open()
client.execute("DROP TABLE IF EXISTS zfund PURGE")
client.execute("CREATE TABLE IF NOT EXISTS zfund(symbol STRING, zDate STRING, measure STRING, value STRING) row format delimited fields terminated by '\t' stored as textfile")
client.execute("LOAD DATA LOCAL INPATH '/home/hadoopuser/stockdata/history/fundamentals/' OVERWRITE INTO TABLE zfund")
client.execute("SELECT * FROM zfund")
while (1):
row = client.fetchOne()
if (row == None):
break
print row
client.execute("SELECT * FROM zfund")
print client.fetchAll()

transport.close()

except Thrift.TException, tx:
print '%s' % (tx.message)
This script opens a connection to the Thrift server at port 10000, purges/creates a new zfund table, and then loads all of the files contained in the 'fundamentals' subdirectory.

A quick test in the Hive shell confirms that the data matches what we need:
hive> select * from zfund where symbol='AA';
OK
AA 20150810 Market Cap (intraday) 13.20B
AA 20150810 Enterprise Value (Aug 11, 2015) 19.53B
AA 20150810 Trailing P/E (ttm, intraday): 21.00
 
Cool! So I now have all of the data I need for testing inside of Hadoop/Hive. Now I just need to connect the SAP HANA One system to this box and test the connectivity.

No comments: