Odyssey
web_stats.py
1 #!/usr/bin/env python
2 '''Collect web stats from elasticsearch for later parsing'''
3 
4 import os
5 from datetime import datetime
6 from datetime import timedelta
7 import tempfile
8 import logging
9 import pwd
10 # pylint: disable=import-error
11 from elasticsearch import Elasticsearch
12 from funcy import compose, partial
13 
14 LOGGER = logging.getLogger(__name__)
15 LOGGER.setLevel(logging.DEBUG)
16 LOGGER.addHandler(logging.StreamHandler())
17 
18 ELASTICSEARCH_URL = os.environ['ELASTICSEARCH_URL']
19 LOG_OUTPUT_DIR = os.environ.get('LOG_OUTPUT_DIR')
20 
21 
23  '''Collect web stats and write to log'''
24  es_conn = __create_es_connection__()
25  access_log = os.path.join(LOG_OUTPUT_DIR, 'access_log')
26  assert es_conn.ping()
27  compose(partial(write_access_log_file, access_log),
28  extract_messages_from_results,
29  search_es)(es_conn)
30 
31 
32 def __create_es_connection__():
33  return Elasticsearch([ELASTICSEARCH_URL],
34  use_ssl=True,
35  ssl_assert_hostname=False)
36 
37 
38 def search_es(es_conn):
39  '''perform es search'''
40  assert es_conn
41  now = datetime.now()
42  yesterday = now - timedelta(days=1)
43  day_before_yesterday = yesterday - timedelta(days=1)
44  query_body = {
45  "query": {
46  "bool": {
47  "filter": [
48  {"match": {"image_tag": "odyssey_web"}},
49  {"range": {"timestamp": {
50  # pylint: disable=no-member
51  "gte": day_before_yesterday.isoformat(),
52  "lte": yesterday.isoformat()
53  }}},
54  ]
55  }
56  }
57  }
58  return es_conn.search(body=query_body)
59 
60 
62  '''pull messages from es search results
63 
64  Returns a generator for results.
65  '''
66  return (hit['_source']['message'] for hit in results['hits']['hits'])
67 
68 
69 def map_messages_into_bins(messages):
70  '''map log items (messages) into "bins"'''
71  bins = {}
72  for message in messages:
73  # Probably an improper split...
74  key = message.split('-')[1].strip() or "default"
75  if key in bins:
76  bins[key].extend([message])
77  else:
78  bins[key] = [message]
79  return bins
80 
81 
83  '''process bins and write access logs'''
84  if not os.path.exists(LOG_OUTPUT_DIR):
85  os.mkdir(LOG_OUTPUT_DIR)
86  for key in bins.keys():
87  filename = os.path.join(LOG_OUTPUT_DIR, '%s_access.log' % key)
88  write_access_log_file(filename, bins[key])
89 
90 
91 def write_access_log_file(filename, lines):
92  '''Write out access log'''
93  with open(filename, 'w') as access_log:
94  access_log.write('\n'.join(lines))
95  return filename
96 
97 
98 if __name__ == '__main__':
99  UID = pwd.getpwnam('www-data').pw_uid
100  os.seteuid(UID)
101  if not LOG_OUTPUT_DIR:
102  LOG_OUTPUT_DIR = tempfile.mkdtemp()
103  LOGGER.info("Starting Elasticsearch web-stats collection ...")
105  LOGGER.info("Done.")
def collect_and_log()
Definition: web_stats.py:22
def extract_messages_from_results(results)
Definition: web_stats.py:61
def write_access_log_file(filename, lines)
Definition: web_stats.py:91
def map_messages_into_bins(messages)
Definition: web_stats.py:69
def process_access_log_bins(bins)
Definition: web_stats.py:82
def search_es(es_conn)
Definition: web_stats.py:38