After spending a few days getting my data into a MariaDB instance I realized that I wasn't going to be able to pull that data into my SAP HANA One instance. I looked into creating a generic ODBC connection but it appears that I am unable to log into the console of the OS to load any libraries I would need.
For the last day or so I have been trying to loading data into Hadoop. At first I tried using Sqoop since that has the ability to extract data from MariaDB. But I was spending too much time trying to figure out why Sqoop was saving the files to odd locations in Hadoop and not getting far on getting my data loaded. So I took a step back and used the create table/load functions in Hive to import what I need.
Hive provides SQL like access to data stored within the Hadoop file system. Once I load data, I can go back and use basic SQL statements to validate what was imported into HDFS.
First I need to go over some of the changes I made to the process.
2. Updating the Stock Market List
The original list of stock market symbols didn't include the name of the company or the country where the company is headquartered. So I updated the symbol extraction script to include these new fields.
import
string,io
import
urllib2
from
bs4 import BeautifulSoup
global
f
def
download_page(url):
aurl
= urllib2.urlopen(url)
soup
= BeautifulSoup(aurl.read(),"html.parser")
print
url
for
row in soup('table')[1]('tr'):
tds
= row('td')
if
(len(tds) > 0) :
f.write(tds[1].string
+ '\t')
f.write(tds[2].string
+ '\t')
f.write(tds[0].string
or u"No Name")
f.write(u'\n')
f
= io.open('stock_names.txt', 'w', encoding='utf8')
#f
= open('stock_names.txt', 'w')
url_part1
=
'http://en.wikipedia.org/wiki/Companies_listed_on_the_New_York_Stock_Exchange_'
url
= url_part1 + '(0-9)'
download_page(url)
for
letter in string.uppercase[:26]:
url_part2
= letter
url
= url_part1 + '(' + letter + ')'
download_page(url)
f.close()
Here are the lines I added:
f.write(tds[1].string
+ '\t')
f.write(tds[2].string
+ '\t')
f.write(tds[0].string
or u"No Name")
f.write(u'\n')
When I'm done, the file looks like this (note the tab delimited format):
DDD
USA 3D Systems Corporation
MMM
USA 3M Company
WBAI
USA 500.com
WUBA
China 58.com Inc.
CAS
USA A. M. Castle & Co.
#
Author...: J. Haynes
#
Purpose..: Copy Stock Symbols file into Hive/Hadoop
import
sys
from
hive_service import ThriftHive
from
hive_service.ttypes import HiveServerException
from
thrift import Thrift
from
thrift.transport import TSocket
from
thrift.transport import TTransport
from
thrift.protocol import TBinaryProtocol
try:
transport
= TSocket.TSocket('localhost', 10000)
transport
= TTransport.TBufferedTransport(transport)
protocol
= TBinaryProtocol.TBinaryProtocol(transport)
client
= ThriftHive.Client(protocol)
transport.open()
client.execute("CREATE
TABLE IF NOT EXISTS zsym(Symbol STRING, Country STRING, SymName
STRING) row format delimited fields terminated by '\t' stored as
textfile")
client.execute("LOAD
DATA LOCAL INPATH '/home/hadoopuser/stockdata/stock_names.txt'
OVERWRITE INTO TABLE zsym")
client.execute("SELECT
* FROM zsym")
while
(1):
row
= client.fetchOne()
if
(row == None):
break
print
row
client.execute("SELECT
* FROM zsym")
print
client.fetchAll()
transport.close()
except
Thrift.TException, tx:
print
'%s' % (tx.message)
This script utilizes the 'Thrift' server which is really just Hive running as a server on the system (the command to start is hive –-service hiveserver which comes up on port 10000).
The first step creates a table named 'zsym' and sets the incoming format so it reads tab delimited files. The second step completes the import and will remove any current data if it already exists.
After the data is loaded, I can go back into the Hive command shell and review the data. Here is a quick example:
hive>
select * from zsym limit 20;
OK
DDD
USA 3D Systems Corporation
MMM
USA 3M Company
WBAI
USA 500.com
WUBA
China 58.com Inc.
CAS
USA A. M. Castle & Co.
AOS
USA A. O. Smith Corporation
ATEN
USA A10 Networks, Inc.
AIR
USA AAR Corporation
Here is another check on just the country field:
hive>
select * from zsym where country = 'Switzerland';
OK
ABB
Switzerland ABB LTD.
AWH
Switzerland Allied World Assurance Company Holdings, AG
CS
Switzerland Credit Suisse Group AG
MTD
Switzerland Mettler Toledo International Inc.
NVS
Switzerland Novartis AG
3. Updating the Stock Market History DataOn review, I also noticed that I wasn't pulling the full history for the stock market data. So I updated the original script to include a wider date range (the details are at https://code.google.com/p/yahoo-finance-managed/wiki/csvHistQuotesDownload)
Here is the updated script to pull that data:
import
string,csv
import
urllib2
from
bs4 import BeautifulSoup
global
f
#
https://code.google.com/p/yahoo-finance-managed/wiki/csvHistQuotesDownload
url_part1
= 'http://ichart.finance.yahoo.com/table.csv?s='
url_part2
= '&d=8&e=1&f=2015&g=d&a=1&b=1&c=1995&ignore=.csv'
print
"Starting"
f
= open('stock_names.txt', 'r')
file_content
= f.readlines()
count
= 1;
print
"About %d tickers will be downloaded" % len(file_content)
for
row in file_content:
ticker,
country, name= row.strip().split("\t")
url
= url_part1 + ticker + url_part2
print
"\nTrying-->" + url
try:
#
This will cause exception on a 404
response
= urllib2.urlopen(url)
print
"Downloading ticker %s (%d out of %d)" % (ticker, count,
len(file_content))
count
= count + 1
history_file
= open('history/' + ticker + '.csv', 'w')
history_file.write(response.read())
history_file.close()
except
Exception, e:
pass
f.close()
Date,Open,High,Low,Close,Volume,Adj
Close
2015-08-07,106.349998,107.110001,106.040001,106.620003,311800,106.620003
2015-08-06,109.370003,109.419998,105.940002,106.18,401400,106.18
2015-08-05,106.949997,109.540001,106.080002,109.370003,539800,109.370003
#!/usr/bin/env
python
#
Name.....: fix_dat.py
#
Author...: J. Haynes
#
Purpose..: Fix stock data files
import
sys,glob,os
os.chdir("/home/hadoopuser/stockdata/history")
for
file in glob.glob("*.csv"):
#print(file)
#
open the text file for reading
f1
= open(file, 'r')
#
Skip past the header in the file
next(f1)
newfilenm
= 'new/' + file + "_new"
f2
= open(newfilenm, 'w')
#
step through each line and add the symbol to the front
for
row in f1:
#print
row
symbol
= os.path.splitext(file)
row
= symbol[0] + ',' + row
#print
row
f2.write(row)
print
"Done!"
f1.close()
f2.close()
This script takes all of the files in the history directory and creates a new file with the stock symbol added. Specifically, it takes the name of the file, strips off the extension, and adds it to the beginning of each line like this (note that the header line from before was skipped with the 'next(f1)' command):
ESI,2015-08-07,2.99,3.45,2.98,3.02,567600,3.02
ESI,2015-08-06,3.00,3.09,2.92,3.00,520500,3.00
ESI,2015-08-05,3.00,3.08,2.95,3.00,487100,3.00
ESI,2015-08-04,3.01,3.06,2.82,3.02,834600,3.02
Now that an entire directory of files has been updated, it can be loaded into Hadoop with this script:
#!/usr/bin/env
python
#
Name.....: updt_dat.py
#
Author...: J. Haynes
#
Purpose..: Copy stock data files into Hive/Hadoop
import
sys
from
hive_service import ThriftHive
from
hive_service.ttypes import HiveServerException
from
thrift import Thrift
from
thrift.transport import TSocket
from
thrift.transport import TTransport
from
thrift.protocol import TBinaryProtocol
#
Example of the data to load
#Date,Open,High,Low,Close,Volume,Adj
Close
#2010-01-28,6.38,6.38,6.12,6.30,61000,5.68551
try:
transport
= TSocket.TSocket('localhost', 10000)
transport
= TTransport.TBufferedTransport(transport)
protocol
= TBinaryProtocol.TBinaryProtocol(transport)
client
= ThriftHive.Client(protocol)
transport.open()
client.execute("DROP
TABLE IF EXISTS zdat PURGE")
client.execute("CREATE
TABLE IF NOT EXISTS zdat(symbol STRING, zDate DATE, Open
DECIMAL(10,7), High DECIMAL(10,7),Low DECIMAL(10,7),Close
DECIMAL(10,7),Volume DECIMAL(10,7),AdjClose DECIMAL(10,7)) row format
delimited fields terminated by ',' stored as textfile")
client.execute("LOAD
DATA LOCAL INPATH '/home/hadoopuser/stockdata/history/new' OVERWRITE
INTO TABLE zdat")
client.execute("SELECT
* FROM zdat")
while
(1):
row
= client.fetchOne()
if
(row == None):
break
print
row
client.execute("SELECT
* FROM zdat")
print
client.fetchAll()
transport.close()
except
Thrift.TException, tx:
print
'%s' % (tx.message)
Again, this connects to the Hive server (port 10000), creates table zdat, and loads the data from the files stored in the 'new' subdirectory on the local file system.
Testing the results back in the Hive shell shows this:
hive>
select * from zdat where symbol='A' limit 3;
OK
A
2015-08-07 40.139999 40.209999 39.630001
39.990002 NULL 39.990002
A
2015-08-06 40.950001 40.959999 39.91
40.119999 NULL 40.119999
A
2015-08-05 40.939999 41.189999 40.689999
40.720001 NULL 40.720001
As you can see above, working with data via Hadoop and Hive takes a different mindset. The tools for manipulating the data are limited since records cannot be deleted or inserted. Luckily, Python and PERL are efficient at making changes to source files before they loaded.
Now that I have finished loading this historical data, I can go back and finish pulling the fundamental statistics. I'm finally getting closer to having usable data in Hadoop that I 'should' be able to pull into SAP HANA (crossing fingers).
No comments:
Post a Comment