From Toxic Baboon, 2 Months ago, written in Text.
This paste is a reply to Untitled from Soiled Marten - view diff
Embed
  1. # This file is intended to run on spark! For more information please read the README.md file contained in the same package!
  2.  
  3. from pyspark import SparkContext
  4. from pyspark.sql.types import *
  5. from pyspark.sql import Row, HiveContext
  6. import sys
  7. import urllib
  8. import gzip
  9. import os
  10. import datetime
  11. import csv
  12.  
  13. # download the blacklist file and return a dataframe
  14. def downloadBlacklist(sqlContext, blacklistLocation, blacklistDF):
  15.     if blacklistDF is not None:
  16.         return blacklistDF
  17.     if not os.path.exists(blacklistLocation):
  18.         urllib.urlretrieve('https://s3.amazonaws.com/dd-interview-data/data_engineer/wikipedia/blacklist_domains_and_pages', blacklistLocation)
  19.     return sqlContext.createDataFrame(sqlContext.read.text('file://' + blacklistLocation).map(lambda r: r[0].split(' ')).map(lambda s: Row(b_domain=s[0], b_title=s[1])))
  20.  
  21.  
  22. # download and gunzip the file to a local temp directory
  23. def downloadFile(dt, temp, filename):
  24.     print 'writing ' + filename + ' to gz file'
  25.     urllib.urlretrieve('https://dumps.wikimedia.org/other/pagecounts-all-sites/' + dt.strftime('%Y') + '/' + dt.strftime('%Y-%m') + '/' + filename + '.gz', temp + '/' + filename + '.gz')
  26.     print 'unzipping ' + filename
  27.     gzString = gzip.GzipFile(temp + '/' + filename + '.gz')
  28.     with open(temp + '/' + filename, 'w') as out:
  29.         for line in gzString:
  30.             out.write(line)
  31.  
  32.  
  33. # Reads the dataframe in and maps it as if it were a CSV with ' ' delimiter
  34. def createDataFrame(sqlContext, temp, filename):
  35.     return sqlContext.createDataFrame(sqlContext.read.text('file://' + temp + '/' + filename).map(lambda r: r[0].split(' ')).map(lambda s: Row(domain=s[0], title=s[1], views=s[2], total=s[3])))
  36.  
  37. # write the results into the results directory
  38. def writeResults(sqlContext, results, filename, resultDF):
  39.     print 'Writing results for ' + filename
  40.     sqlContext.createDataFrame(resultDF.map(lambda r: Row(value=r[0] + ' ' + r[1] + ' ' + r[2] + ' ' + r[3]))).coalesce(1).write.text('file://' + results + '/' + filename)
  41.  
  42.  
  43. # removes all blacklisted pages. Both dataframes do not evenly match on columns, so use a left outer join and filter where the df did not match
  44. def filterBlacklist(df, blacklistDF):
  45.     df.join(blacklistDF, [df.domain == blacklistDF.b_domain, df.title == blacklistDF.b_title], 'left_outer').filter('b_title is null').drop('b_title').drop('b_domain')
  46.  
  47. # gathers the top 25 articles for the given day and hour by total pageviews for each unique domain
  48. def sortFirstNColumns(sqlContext, df, n):
  49.     df.registerTempTable('df')
  50.     return sqlContext.sql("SELECT o.domain, o.title, o.views, o.total FROM (SELECT i.domain, i.title, i.views, i.total, ROW_NUMBER() OVER (PARTITION BY i.domain) as rownum FROM (SELECT * FROM df a ORDER BY a.domain ASC, a.views DESC) i) o WHERE o.rownum < 26")
  51.  
  52.  
  53. # first creates the spark context. Not done at the start of the application to be lazy loaded in case no files need downloading
  54. def getSqlContext(sqlContext):
  55.     if sqlContext is not None:
  56.         return sqlContext
  57.     sc = SparkContext("local", "Pageviews")
  58.     return HiveContext(sc)
  59.  
  60.  
  61. # for each hour between $1-$2 and $3-$4
  62. def iterateOnFiles(startDate, endDate, temp, results, dateFormat, blacklistLocation):
  63.     sqlContext = None
  64.     blacklistDF = None
  65.     datetimeIterator = startDate
  66.     while datetimeIterator <= endDate:
  67.         filename = 'pagecounts-' + datetimeIterator.strftime(dateFormat)
  68.         # this check is to ensure that it only runs if the work has not been performed for the given day and hour
  69.         if os.path.exists(results + '/' + filename):
  70.             datetimeIterator = datetimeIterator + datetime.timedelta(0,3600)
  71.             continue
  72.         print datetimeIterator.strftime(dateFormat)
  73.         sqlContext = getSqlContext(sqlContext)
  74.         downloadFile(datetimeIterator, temp, filename)
  75.         df = createDataFrame(sqlContext, temp, filename)
  76.         filterBlacklist(df, downloadBlacklist(sqlContext, blacklistLocation, blacklistDF))
  77.         resultDF = sortFirstNColumns(sqlContext, df, 25)
  78.         writeResults(sqlContext, results, filename, resultDF)
  79.         datetimeIterator = datetimeIterator + datetime.timedelta(0,3600)
  80.     if sqlContext is None:
  81.         print 'No work to do!'
  82.  
  83.  
  84. # Make directories, parse arguments, and load blacklist
  85. def executePageviews():
  86.     dateFormat = '%Y%m%d-%H%M%S'
  87.     currentDate = datetime.datetime.strptime(datetime.datetime.now().strftime('%Y%m%d-%H') + '0000', dateFormat)
  88.     startDate = currentDate
  89.     endDate = startDate
  90.     # gather the program input parameters for the date and hour of data to analyze. An end date is also acceptable, allowing this application to run for a range of dates and hours
  91.     if len(sys.argv) >= 3:
  92.         startDate = datetime.datetime.strptime(sys.argv[1] + '-' + sys.argv[2] + '0000', dateFormat)
  93.         endDate = startDate
  94.         if len(sys.argv) >= 5:
  95.             endDate = datetime.datetime.strptime(sys.argv[3] + '-' + sys.argv[4] + '0000', dateFormat)
  96.     if startDate > datetime.datetime.strptime('20160805-120000', dateFormat) or startDate < datetime.datetime.strptime('20140923-010000', dateFormat) or endDate > datetime.datetime.strptime('20160805-120000', dateFormat) or endDate < datetime.datetime.strptime('20140923-010000', dateFormat):
  97.         print 'Start and end time must be between 20140923 01 and 20160805 12'
  98.         return
  99.     # make some directories
  100.     temp="/tmp/pageviews"
  101.     currentDirSplit = sys.argv[0].split('/')
  102.     currentDirSplit.pop()
  103.     results='/'.join(currentDirSplit) + "/pageviews_results"
  104.     if not os.path.exists(temp):
  105.         os.makedirs(temp)
  106.     if not os.path.exists(results):
  107.         os.makedirs(results)
  108.     # download blacklist
  109.     blacklistLocation = temp + '/blacklist'
  110.     iterateOnFiles(startDate, endDate, temp, results, dateFormat, blacklistLocation)
  111.  
  112.  
  113. if __name__ == '__main__':
  114.     try:
  115.         executePageviews()
  116.     except Exception:
  117.         print 'Error while preparing data! Please check your parameters and try again!', sys.argv[1:]
  118.         raise
  119.