I put most of the details on how this was done on the small site I setup for my LLC (ZBasisADM Consulting) --> http://www.zbasisconsulting.com/PyHDB1
Most of what I did was update my previous scripts to make use of the HDB Python module available on GitHub --> https://github.com/SAP/PyHDB
This module made loading very simple. Here is a quick example that shows setting up a connection ('conn'), opening a cursor object ('cur'), and then executing a statement.
# Name....: mk_tables.py
# Author..: J. Haynes
# Purpose.: Create needed tables in HDB
# Pull in the needed libraries and setup the connection to the AWS instance
import pyhdb
conn = pyhdb.connect(
host="54.226.200.xxx",
port=30015,
user="hanauser",
password="xxxxx"
)
# Setup a cursor object
cur = conn.cursor()
# Create tables in HANA
cur.execute('CREATE TABLE F_STATS("SymId" INTEGER, "SymName" \
VARCHAR (10) null,"DateText" VARCHAR (20) null, "HeadLine" \
VARCHAR (30) null, "DataLine" VARCHAR (30) null)')
Nice and simple. This is a very quick very easy way to load data into HANA from a remote system.
In my other blog post, I provide examples of scripts that scrape information from web pages and then use that information to build larger sets of data.
Wednesday, August 19, 2015
Tuesday, August 11, 2015
HADOOP - Loading data via Hive Part Deux
The final set of data I need to load is the fundamental measures for each company. I updated the python script to now create a text file instead of loading directly into MariaDB. There is quite a bit of web scraping going on in this script so it is a bit longer than the others (not to mention that I'm not really a Python hacker so my code is more verbose than it needs to be).
The script above steps through the list of companies compiled previously into the file stock_names.txt and pulls the fundamental metrics from a page at Yahoo. It uses Beautifulsoup4 to extract data from each row in a table (tr's) based on the class for each table cell (td's). The first class (yfnc_tablehead1) contains the description of the metric and the second (yfnc_tabledata1) contains the results for the metric. Each of these is added to a list (hlist and dlist) and then the script steps through the contents of the first list to write out a line to a text file.
Once all of that data is collected the data needs to be uploaded into HDFS using Hive. I use another Python script to call the Hive-Thrift server (again, this is a script I based on the documentation on the Apache Hive site).
This script opens a connection to the Thrift server at port 10000, purges/creates a new zfund table, and then loads all of the files contained in the 'fundamentals' subdirectory.
A quick test in the Hive shell confirms that the data matches what we need:
Cool! So I now have all of the data I need for testing inside of Hadoop/Hive. Now I just need to connect the SAP HANA One system to this box and test the connectivity.
#
Name.....: getb.py
#
Author...: J. Haynes
#
Purpose..: Generate a list of book values
#
Load any needed libraries
import
os,time
import
string
import
urllib2
from
bs4 import BeautifulSoup
#
For now this is a test URL
#url
= "http://finance.yahoo.com/q/ks?s=GOOG+Key+Statistics"
print
"Starting"
f1
= open('/home/hadoopuser/stockdata/stock_names.txt', 'r')
file_content
= f1.readlines()
foutput
= '/home/hadoopuser/stockdata/history/fundamentals/FValues_' +
time.strftime("%Y%m%d") + '.txt'
f2
= open(foutput, 'w')
#
Get the total number of rows in the table
#numrows
= int (file_content.rowcount)
print
"About %d tickers will be downloaded" % len(file_content)
#
step through each symbol and generate the URL for the extraction
for
row in file_content:
ticker,
country, name= row.strip().split("\t")
url
= "http://finance.yahoo.com/q/ks?s=" + ticker +
"+Key+Statistics"
#
Open the URL and parse
aurl
= urllib2.urlopen(url)
soup
= BeautifulSoup(aurl.read(),"html.parser")
print
"Pulling Ticker-->" + ticker
#
Create lists of data parsed by using BeautifulSoup
head1
= soup.find_all("td", class_="yfnc_tablehead1")
data1
= soup.find_all("td", class_="yfnc_tabledata1")
#
Create a list containing td header content
hlist
= [""]
for
row in head1:
headline
= row.contents[0]
#print
"Headline->" + headline
hlist.append(headline)
#
Create a list containing td data content
dlist
= [""]
for
row in data1:
dataline
= row.string
dlist.append(dataline)
#
Create a list containing td data content
dlist
= [""]
for
row in data1:
dataline
= row.string
dlist.append(dataline)
#
Display what has been collected for this symbol and insert into the
file
##print
"Number of records to add to the table is:"
##print
len(hlist) - 1
j=1
while
j < len(hlist):
if
hlist[j] is not None:
f2.write(ticker
+ '\t' + time.strftime("%Y%m%d") + '\t' + hlist[j] + '\t' +
dlist[j] + '\n')
stmt
= ""
j
+= 1
time.sleep(1)
f1.close()
f2.close()
Once all of that data is collected the data needs to be uploaded into HDFS using Hive. I use another Python script to call the Hive-Thrift server (again, this is a script I based on the documentation on the Apache Hive site).
#!/usr/bin/env
python
#
Name.....: updt_fund.py
#
Author...: J. Haynes
#
Purpose..: Copy fundamental information into Hive/Hadoop
import
sys
from
hive_service import ThriftHive
from
hive_service.ttypes import HiveServerException
from
thrift import Thrift
from
thrift.transport import TSocket
from
thrift.transport import TTransport
from
thrift.protocol import TBinaryProtocol
#
Example of the data to load
#XYZ
20150810 Market Cap (intraday) 1.64B
try:
transport
= TSocket.TSocket('localhost', 10000)
transport
= TTransport.TBufferedTransport(transport)
protocol
= TBinaryProtocol.TBinaryProtocol(transport)
client
= ThriftHive.Client(protocol)
transport.open()
client.execute("DROP
TABLE IF EXISTS zfund PURGE")
client.execute("CREATE
TABLE IF NOT EXISTS zfund(symbol STRING, zDate STRING, measure
STRING, value STRING) row format delimited fields terminated by '\t'
stored as textfile")
client.execute("LOAD
DATA LOCAL INPATH '/home/hadoopuser/stockdata/history/fundamentals/'
OVERWRITE INTO TABLE zfund")
client.execute("SELECT
* FROM zfund")
while
(1):
row
= client.fetchOne()
if
(row == None):
break
print
row
client.execute("SELECT
* FROM zfund")
print
client.fetchAll()
transport.close()
except
Thrift.TException, tx:
print
'%s' % (tx.message)
A quick test in the Hive shell confirms that the data matches what we need:
hive>
select * from zfund where symbol='AA';
OK
AA
20150810 Market Cap (intraday) 13.20B
AA
20150810 Enterprise Value (Aug 11, 2015) 19.53B
AA
20150810 Trailing P/E (ttm, intraday): 21.00
Cool! So I now have all of the data I need for testing inside of Hadoop/Hive. Now I just need to connect the SAP HANA One system to this box and test the connectivity.
Monday, August 10, 2015
HADOOP - Loading data via Hive
1. Background
After spending a few days getting my data into a MariaDB instance I realized that I wasn't going to be able to pull that data into my SAP HANA One instance. I looked into creating a generic ODBC connection but it appears that I am unable to log into the console of the OS to load any libraries I would need.
For the last day or so I have been trying to loading data into Hadoop. At first I tried using Sqoop since that has the ability to extract data from MariaDB. But I was spending too much time trying to figure out why Sqoop was saving the files to odd locations in Hadoop and not getting far on getting my data loaded. So I took a step back and used the create table/load functions in Hive to import what I need.
Hive provides SQL like access to data stored within the Hadoop file system. Once I load data, I can go back and use basic SQL statements to validate what was imported into HDFS.
First I need to go over some of the changes I made to the process.
2. Updating the Stock Market List
The original list of stock market symbols didn't include the name of the company or the country where the company is headquartered. So I updated the symbol extraction script to include these new fields.
This adds the country tds[2] and the name tds[1] to the list. If the name is blank, then enter the text “No Name”
When I'm done, the file looks like this (note the tab delimited format):
This data is then easily loaded into Hadoop via this script that utilizes the Hive server (this is from an example in the Apache Hive documentation):
This script utilizes the 'Thrift' server which is really just Hive running as a server on the system (the command to start is hive –-service hiveserver which comes up on port 10000).
The first step creates a table named 'zsym' and sets the incoming format so it reads tab delimited files. The second step completes the import and will remove any current data if it already exists.
After the data is loaded, I can go back into the Hive command shell and review the data. Here is a quick example:
On review, I also noticed that I wasn't pulling the full history for the stock market data. So I updated the original script to include a wider date range (the details are at https://code.google.com/p/yahoo-finance-managed/wiki/csvHistQuotesDownload)
The other issue I encountered is deals with how Hive can load data. When I run the script above it populates an entire directory with csv files that have the format shown here (also note the header line which we will correct for below):
Hive has the ability to load all of the files in a given directory (a really handy feature). The problem is Hive doesn't know that each file represents a separate stock symbol. To fix this I wrote the script below which creates a new set of csv files with the symbol added to the beginning of each data line.
This script takes all of the files in the history directory and creates a new file with the stock symbol added. Specifically, it takes the name of the file, strips off the extension, and adds it to the beginning of each line like this (note that the header line from before was skipped with the 'next(f1)' command):
Now that an entire directory of files has been updated, it can be loaded into Hadoop with this script:
Testing the results back in the Hive shell shows this:
As you can see above, working with data via Hadoop and Hive takes a different mindset. The tools for manipulating the data are limited since records cannot be deleted or inserted. Luckily, Python and PERL are efficient at making changes to source files before they loaded.
Now that I have finished loading this historical data, I can go back and finish pulling the fundamental statistics. I'm finally getting closer to having usable data in Hadoop that I 'should' be able to pull into SAP HANA (crossing fingers).
After spending a few days getting my data into a MariaDB instance I realized that I wasn't going to be able to pull that data into my SAP HANA One instance. I looked into creating a generic ODBC connection but it appears that I am unable to log into the console of the OS to load any libraries I would need.
For the last day or so I have been trying to loading data into Hadoop. At first I tried using Sqoop since that has the ability to extract data from MariaDB. But I was spending too much time trying to figure out why Sqoop was saving the files to odd locations in Hadoop and not getting far on getting my data loaded. So I took a step back and used the create table/load functions in Hive to import what I need.
Hive provides SQL like access to data stored within the Hadoop file system. Once I load data, I can go back and use basic SQL statements to validate what was imported into HDFS.
First I need to go over some of the changes I made to the process.
2. Updating the Stock Market List
The original list of stock market symbols didn't include the name of the company or the country where the company is headquartered. So I updated the symbol extraction script to include these new fields.
import
string,io
import
urllib2
from
bs4 import BeautifulSoup
global
f
def
download_page(url):
aurl
= urllib2.urlopen(url)
soup
= BeautifulSoup(aurl.read(),"html.parser")
print
url
for
row in soup('table')[1]('tr'):
tds
= row('td')
if
(len(tds) > 0) :
f.write(tds[1].string
+ '\t')
f.write(tds[2].string
+ '\t')
f.write(tds[0].string
or u"No Name")
f.write(u'\n')
f
= io.open('stock_names.txt', 'w', encoding='utf8')
#f
= open('stock_names.txt', 'w')
url_part1
=
'http://en.wikipedia.org/wiki/Companies_listed_on_the_New_York_Stock_Exchange_'
url
= url_part1 + '(0-9)'
download_page(url)
for
letter in string.uppercase[:26]:
url_part2
= letter
url
= url_part1 + '(' + letter + ')'
download_page(url)
f.close()
Here are the lines I added:
f.write(tds[1].string
+ '\t')
f.write(tds[2].string
+ '\t')
f.write(tds[0].string
or u"No Name")
f.write(u'\n')
When I'm done, the file looks like this (note the tab delimited format):
DDD
USA 3D Systems Corporation
MMM
USA 3M Company
WBAI
USA 500.com
WUBA
China 58.com Inc.
CAS
USA A. M. Castle & Co.
#
Author...: J. Haynes
#
Purpose..: Copy Stock Symbols file into Hive/Hadoop
import
sys
from
hive_service import ThriftHive
from
hive_service.ttypes import HiveServerException
from
thrift import Thrift
from
thrift.transport import TSocket
from
thrift.transport import TTransport
from
thrift.protocol import TBinaryProtocol
try:
transport
= TSocket.TSocket('localhost', 10000)
transport
= TTransport.TBufferedTransport(transport)
protocol
= TBinaryProtocol.TBinaryProtocol(transport)
client
= ThriftHive.Client(protocol)
transport.open()
client.execute("CREATE
TABLE IF NOT EXISTS zsym(Symbol STRING, Country STRING, SymName
STRING) row format delimited fields terminated by '\t' stored as
textfile")
client.execute("LOAD
DATA LOCAL INPATH '/home/hadoopuser/stockdata/stock_names.txt'
OVERWRITE INTO TABLE zsym")
client.execute("SELECT
* FROM zsym")
while
(1):
row
= client.fetchOne()
if
(row == None):
break
print
row
client.execute("SELECT
* FROM zsym")
print
client.fetchAll()
transport.close()
except
Thrift.TException, tx:
print
'%s' % (tx.message)
This script utilizes the 'Thrift' server which is really just Hive running as a server on the system (the command to start is hive –-service hiveserver which comes up on port 10000).
The first step creates a table named 'zsym' and sets the incoming format so it reads tab delimited files. The second step completes the import and will remove any current data if it already exists.
After the data is loaded, I can go back into the Hive command shell and review the data. Here is a quick example:
hive>
select * from zsym limit 20;
OK
DDD
USA 3D Systems Corporation
MMM
USA 3M Company
WBAI
USA 500.com
WUBA
China 58.com Inc.
CAS
USA A. M. Castle & Co.
AOS
USA A. O. Smith Corporation
ATEN
USA A10 Networks, Inc.
AIR
USA AAR Corporation
Here is another check on just the country field:
hive>
select * from zsym where country = 'Switzerland';
OK
ABB
Switzerland ABB LTD.
AWH
Switzerland Allied World Assurance Company Holdings, AG
CS
Switzerland Credit Suisse Group AG
MTD
Switzerland Mettler Toledo International Inc.
NVS
Switzerland Novartis AG
3. Updating the Stock Market History DataOn review, I also noticed that I wasn't pulling the full history for the stock market data. So I updated the original script to include a wider date range (the details are at https://code.google.com/p/yahoo-finance-managed/wiki/csvHistQuotesDownload)
Here is the updated script to pull that data:
import
string,csv
import
urllib2
from
bs4 import BeautifulSoup
global
f
#
https://code.google.com/p/yahoo-finance-managed/wiki/csvHistQuotesDownload
url_part1
= 'http://ichart.finance.yahoo.com/table.csv?s='
url_part2
= '&d=8&e=1&f=2015&g=d&a=1&b=1&c=1995&ignore=.csv'
print
"Starting"
f
= open('stock_names.txt', 'r')
file_content
= f.readlines()
count
= 1;
print
"About %d tickers will be downloaded" % len(file_content)
for
row in file_content:
ticker,
country, name= row.strip().split("\t")
url
= url_part1 + ticker + url_part2
print
"\nTrying-->" + url
try:
#
This will cause exception on a 404
response
= urllib2.urlopen(url)
print
"Downloading ticker %s (%d out of %d)" % (ticker, count,
len(file_content))
count
= count + 1
history_file
= open('history/' + ticker + '.csv', 'w')
history_file.write(response.read())
history_file.close()
except
Exception, e:
pass
f.close()
Date,Open,High,Low,Close,Volume,Adj
Close
2015-08-07,106.349998,107.110001,106.040001,106.620003,311800,106.620003
2015-08-06,109.370003,109.419998,105.940002,106.18,401400,106.18
2015-08-05,106.949997,109.540001,106.080002,109.370003,539800,109.370003
#!/usr/bin/env
python
#
Name.....: fix_dat.py
#
Author...: J. Haynes
#
Purpose..: Fix stock data files
import
sys,glob,os
os.chdir("/home/hadoopuser/stockdata/history")
for
file in glob.glob("*.csv"):
#print(file)
#
open the text file for reading
f1
= open(file, 'r')
#
Skip past the header in the file
next(f1)
newfilenm
= 'new/' + file + "_new"
f2
= open(newfilenm, 'w')
#
step through each line and add the symbol to the front
for
row in f1:
#print
row
symbol
= os.path.splitext(file)
row
= symbol[0] + ',' + row
#print
row
f2.write(row)
print
"Done!"
f1.close()
f2.close()
This script takes all of the files in the history directory and creates a new file with the stock symbol added. Specifically, it takes the name of the file, strips off the extension, and adds it to the beginning of each line like this (note that the header line from before was skipped with the 'next(f1)' command):
ESI,2015-08-07,2.99,3.45,2.98,3.02,567600,3.02
ESI,2015-08-06,3.00,3.09,2.92,3.00,520500,3.00
ESI,2015-08-05,3.00,3.08,2.95,3.00,487100,3.00
ESI,2015-08-04,3.01,3.06,2.82,3.02,834600,3.02
Now that an entire directory of files has been updated, it can be loaded into Hadoop with this script:
#!/usr/bin/env
python
#
Name.....: updt_dat.py
#
Author...: J. Haynes
#
Purpose..: Copy stock data files into Hive/Hadoop
import
sys
from
hive_service import ThriftHive
from
hive_service.ttypes import HiveServerException
from
thrift import Thrift
from
thrift.transport import TSocket
from
thrift.transport import TTransport
from
thrift.protocol import TBinaryProtocol
#
Example of the data to load
#Date,Open,High,Low,Close,Volume,Adj
Close
#2010-01-28,6.38,6.38,6.12,6.30,61000,5.68551
try:
transport
= TSocket.TSocket('localhost', 10000)
transport
= TTransport.TBufferedTransport(transport)
protocol
= TBinaryProtocol.TBinaryProtocol(transport)
client
= ThriftHive.Client(protocol)
transport.open()
client.execute("DROP
TABLE IF EXISTS zdat PURGE")
client.execute("CREATE
TABLE IF NOT EXISTS zdat(symbol STRING, zDate DATE, Open
DECIMAL(10,7), High DECIMAL(10,7),Low DECIMAL(10,7),Close
DECIMAL(10,7),Volume DECIMAL(10,7),AdjClose DECIMAL(10,7)) row format
delimited fields terminated by ',' stored as textfile")
client.execute("LOAD
DATA LOCAL INPATH '/home/hadoopuser/stockdata/history/new' OVERWRITE
INTO TABLE zdat")
client.execute("SELECT
* FROM zdat")
while
(1):
row
= client.fetchOne()
if
(row == None):
break
print
row
client.execute("SELECT
* FROM zdat")
print
client.fetchAll()
transport.close()
except
Thrift.TException, tx:
print
'%s' % (tx.message)
Again, this connects to the Hive server (port 10000), creates table zdat, and loads the data from the files stored in the 'new' subdirectory on the local file system.
Testing the results back in the Hive shell shows this:
hive>
select * from zdat where symbol='A' limit 3;
OK
A
2015-08-07 40.139999 40.209999 39.630001
39.990002 NULL 39.990002
A
2015-08-06 40.950001 40.959999 39.91
40.119999 NULL 40.119999
A
2015-08-05 40.939999 41.189999 40.689999
40.720001 NULL 40.720001
As you can see above, working with data via Hadoop and Hive takes a different mindset. The tools for manipulating the data are limited since records cannot be deleted or inserted. Luckily, Python and PERL are efficient at making changes to source files before they loaded.
Now that I have finished loading this historical data, I can go back and finish pulling the fundamental statistics. I'm finally getting closer to having usable data in Hadoop that I 'should' be able to pull into SAP HANA (crossing fingers).
Subscribe to:
Posts (Atom)