2 '''Collect web stats from elasticsearch for later parsing''' 5 from datetime
import datetime
6 from datetime
import timedelta
11 from elasticsearch
import Elasticsearch
12 from funcy
import compose, partial
14 LOGGER = logging.getLogger(__name__)
15 LOGGER.setLevel(logging.DEBUG)
16 LOGGER.addHandler(logging.StreamHandler())
18 ELASTICSEARCH_URL = os.environ[
'ELASTICSEARCH_URL']
19 LOG_OUTPUT_DIR = os.environ.get(
'LOG_OUTPUT_DIR')
23 '''Collect web stats and write to log''' 24 es_conn = __create_es_connection__()
25 access_log = os.path.join(LOG_OUTPUT_DIR,
'access_log')
27 compose(partial(write_access_log_file, access_log),
28 extract_messages_from_results,
32 def __create_es_connection__():
33 return Elasticsearch([ELASTICSEARCH_URL],
35 ssl_assert_hostname=
False)
39 '''perform es search''' 42 yesterday = now - timedelta(days=1)
43 day_before_yesterday = yesterday - timedelta(days=1)
48 {
"match": {
"image_tag":
"odyssey_web"}},
49 {
"range": {
"timestamp": {
51 "gte": day_before_yesterday.isoformat(),
52 "lte": yesterday.isoformat()
58 return es_conn.search(body=query_body)
62 '''pull messages from es search results 64 Returns a generator for results. 66 return (hit[
'_source'][
'message']
for hit
in results[
'hits'][
'hits'])
70 '''map log items (messages) into "bins"''' 72 for message
in messages:
74 key = message.split(
'-')[1].strip()
or "default" 76 bins[key].extend([message])
83 '''process bins and write access logs''' 84 if not os.path.exists(LOG_OUTPUT_DIR):
85 os.mkdir(LOG_OUTPUT_DIR)
86 for key
in bins.keys():
87 filename = os.path.join(LOG_OUTPUT_DIR,
'%s_access.log' % key)
92 '''Write out access log''' 93 with open(filename,
'w')
as access_log:
94 access_log.write(
'\n'.join(lines))
98 if __name__ ==
'__main__':
99 UID = pwd.getpwnam(
'www-data').pw_uid
101 if not LOG_OUTPUT_DIR:
102 LOG_OUTPUT_DIR = tempfile.mkdtemp()
103 LOGGER.info(
"Starting Elasticsearch web-stats collection ...")
def extract_messages_from_results(results)
def write_access_log_file(filename, lines)
def map_messages_into_bins(messages)
def process_access_log_bins(bins)