Odyssey
logs.py
1 #!/usr/bin/env python
2 """
3 Access Elasticsearch based logs.
4 """
5 
6 import datetime
7 import random
8 import requests
9 import shellish
10 import time
11 from . import s3, base
12 
13 URL_FORMAT = 'https://logs.infra.%(region)s.homecu.io'
14 
15 
16 class ES(object):
17  """ Wrap API access to elasticsearch. """
18 
19  def __init__(self, url):
20  self._url = url.rstrip('/')
21  self._http = self._establish()
22 
23  def _establish(self):
24  http = requests.Session()
25  # XXX: Keep this code internal please.
26  _1 = 443, 444, 457, 440, 447, 458, 438, 448, 451, 442
27  _2 = 400, 392, 461, 444, 448, 443, 444, 452
28  # Use the redirection as our auth success check. There is no
29  # 4xx level response to auth failures with oauth2_proxy inside
30  # via nginx's auth-request.
31 
32  def unfurl(arr):
33  return (chr(x - (7 ** 3)) for x in reversed(arr))
34  success_url = '/__auth_success_%d__' % (random.random() * 2 ** 64)
35  r = http.post(self._url + '/oauth2/sign_in', data={
36  "username": ''.join(unfurl(_1)),
37  "password": ''.join(unfurl(_2)),
38  "rd": success_url
39  }, allow_redirects=False)
40  if r.status_code not in (301, 302) or \
41  r.headers.get('location') != success_url:
42  raise Exception("Login failure: [%d] %s" % (r.status_code, r.text))
43  return http
44 
45  def search(self, index, type=None, fields=None, start=None, limit=None,
46  sort_order='asc', **bool_filters):
47  if type is not None:
48  urn = '%s/%s' % (index, type)
49  else:
50  urn = index
51  params = {
52  "query": {
53  "constant_score": {
54  "filter": {
55  "bool": bool_filters
56  }
57  }
58  },
59  "sort": [{
60  "timestamp": {"order": sort_order}
61  }, {
62  "_uid": {"order": "desc"} # tiebreaker/seen-tracker
63  }]
64  }
65  if start is not None:
66  params['from'] = start
67  if limit is not None:
68  params['size'] = limit
69  if fields is not None:
70  params['_source'] = fields
71  r = self._http.get('%s/%s/_search?pretty' % (self._url, urn),
72  json=params)
73  r.raise_for_status()
74  data = r.json()
75  assert not data['_shards']['failed']
76  assert not data['timed_out']
77  return data['hits']['hits']
78 
79 
81  """ Dump or follow Odyssey system logs. """
82 
83  name = 'logs'
84  levels = (
85  "debug",
86  "info",
87  "warning",
88  "error",
89  "critical",
90  )
91 
92  def setup_args(self, parser):
93  self.add_stack_argument()
94  self.table_args_parser = self.add_table_arguments()
95  self.add_argument('--follow', '-f', action='store_true',
96  help='Follow mode.')
97  self.add_argument('--level', '-l', choices=self.levels,
98  help='Minimum log level.')
99  self.add_argument('--image', '-i', help='Only show logs for this '
100  'image.')
101  self.add_argument('--highlight', '-hl', nargs='+',
102  help='Highlight phrases found in log messages.')
103  self.add_argument('--query', '-q', nargs='+',
104  help='Elasticsearch Query [EXPERT USE]')
105  self.add_argument('-n', type=int, default=20,
106  help='Number of lines to initially show.')
107  self.add_argument('--verbose', '-v', action='store_true',
108  help='Verbose log output.')
109  self.add_argument('--url-override', default=None,
110  help='Override the Elasticsearch URL.')
111 
112  def run(self, args):
113  if args.url_override:
114  url = args.url_override
115  else:
116  stack = self.get_stack(args.stack)
117  if stack is None:
118  raise SystemExit('Stack not found: %s' % args.stack)
119  url = URL_FORMAT % {"region": stack.region}
120  es = ES(url)
121  must = [{
122  "wildcard": {"containerName": "*%s*" % args.stack}
123  }]
124  if args.image:
125  must.append({"term": {"image_tag": args.image}})
126  if args.query:
127  must.append({"query_string": {"query": ' '.join(args.query)}})
128  if args.level is not None:
129  levels = self.levels[self.levels.index(args.level):]
130  must.append({"terms": {"priority": levels}})
131  rows = list(reversed(es.search(
132  'logging-*',
133  limit=args.n,
134  must=must, sort_order='desc'
135  )))
136  seen = set(x['sort'][1] for x in rows)
137  table_options = self.table_args_parser(args)
138  table = self.make_table(args.verbose,
139  args.highlight or [],
140  table_options)
141  table.print(rows)
142  if not args.follow:
143  return
144  ts_filter = {"gte": 0}
145  must.append({
146  "range": {
147  "timestamp": ts_filter
148  }
149  })
150  interval = 0.200
151  while True:
152  if rows:
153  last_row_sort = rows[-1]['sort']
154  ts_filter['gte'] = last_row_sort[0]
155  # Note, this handles paging properly but setting limit to a high
156  # value is more performant.
157  rows = es.search('logging-*', must=must, limit=10000)
158  new = False
159  for x in rows:
160  if x['sort'][1] not in seen:
161  table.print_row(x)
162  seen.add(x['sort'][1])
163  new = True
164  if new:
165  interval = 0
166  elif interval < 30:
167  interval += 0.200
168  time.sleep(interval)
169 
170  def pretty_ts(self, dt):
171  buf = []
172  if base.localnow() - dt > datetime.timedelta(hours=12):
173  buf.append(base.formatdate(dt))
174  buf.append(base.formattime(dt, format='%I:%M:%S.%f %p'))
175  return ', '.join(buf)
176 
177  def make_table(self, verbose, highlight, table_options):
178  colored_levels = {
179  "DEBUG": "<dim>DEBUG</dim>",
180  "INFO": "<blue>INFO</blue>",
181  "WARNING": "<b>WARNING</b>",
182  "ERROR": "<red>ERROR</red>",
183  "err": "<red>ERROR</red>",
184  "CRITICAL": "<blink><b><red>CRITICAL</red></b></blink>",
185  }
186 
187  def ts_acc(x):
188  return self.pretty_ts(base.parse_ts(
189  x['_source']['timestamp'] + 'Z'
190  ))
191 
192  def level_acc(x):
193  return colored_levels.get(x['_source']['priority'],
194  'XXX-%s' % x['_source']['priority'])
195 
196  def tag_acc(x):
197  return x['_source']['containerTag']
198 
199  def msg_acc(r):
200  msg = r['_source']['message']
201  for hl in highlight:
202  msg = msg.replace(hl, '<reverse><b>%s</b></reverse>' % hl)
203  return msg
204 
205  def hostname_acc(x):
206  return x['_source']['hostname'][:19]
207 
208  def container_name_acc(x):
209  return x['_source']['containerName'][12:24]
210 
211  if not verbose:
212  config = {
213  "headers": [
214  'Time',
215  'Level',
216  'Image',
217  'Message',
218  ],
219  "columns": [
220  {"minwidth": 13},
221  None,
222  None,
223  None,
224  ],
225  "accessors": [
226  ts_acc,
227  level_acc,
228  tag_acc,
229  msg_acc,
230  ]
231  }
232  else:
233  config = {
234  "headers": [
235  'Time',
236  'Level',
237  'Container',
238  'Host',
239  'Message',
240  ],
241  "columns": [
242  {"minwidth": 13},
243  None,
244  None,
245  None,
246  None,
247  ],
248  "accessors": [
249  ts_acc,
250  level_acc,
251  container_name_acc,
252  hostname_acc,
253  msg_acc,
254  ]
255  }
256  config = {**config, **table_options}
257  return shellish.Table(**config)
def pretty_ts(self, dt)
Definition: logs.py:170
def make_table(self, verbose, highlight, table_options)
Definition: logs.py:177
def add_stack_argument(self, *args, env=DEFAULT_STACK_ENV, help=None, metavar='STACK_NAME', **kwargs)
Definition: base.py:56
def get_stack(self, name)
Definition: s3.py:136
def _establish(self)
Definition: logs.py:23