Commit ff080a0c authored by Chris Merrett's avatar Chris Merrett

Initial commit

parents
tf_mod_aws_elasticsearch_cleanup
================================
A Terraform module to ensure that ElasticSearch does not run out of disk space.
What this module does
---------------------
- Creates a named Lambda function that handles the deletion of items
- Creates an IAM role and policy that grants the function access to ES
- Creates a Cloudwatch event rule that triggers the function on a schedule
What this module doesn't do
---------------------------
- Create an SNS topic to send failures notifications to
Input Variables
---------------
*Required*
* `es_endpoint` - Elasticsearch endpoint.
* `index` - Prefix of the index names. e.g. `logstash` if your indices look
like `logstash-2017.10.30`.
* `delete_after` - How many days old to keep.
* `index_format` - Variable section of the index names. e.g. `%Y.%m.%d` if
you indices look like `logstash-2017.10.30`.
*Optional*
* `sns_alert` - SNS topic ARN to send failure alerts to.
* `prefix` - A prefix for the resource names, this helps create multiple
instances of this stack for different environments and regions.
* `schedule` - Schedule expression for running the cleanup function.
* `python_version` - Python version to be used. Defaults to 2.7
Default is once a day at 03:00 GMT.
See: http://docs.aws.amazon.com/AmazonCloudWatch/latest/events/ScheduledEvents.html
Usage
-----
```
module "es_cleanup" {
source = "git::https://git.steamhaus.co.uk/steamhaus/tf_mod_aws_elasticsearch_cleanup"
prefix = "test-eu-west-1-"
es_endpoint = "test-es-XXXXXXX.eu-west-1.es.amazonaws.com"
sns_alert = "arn:aws:sns:eu-west-1:123456789012:alertme"
index = "logstash"
delete_after = 7
index_format = "%Y.%m.%d"
python_version = "3.6"
}
```
resource "aws_cloudwatch_event_rule" "schedule" {
name = "${var.prefix}es-cleanup-execution-schedule"
description = "es-cleanup execution schedule"
schedule_expression = "${var.schedule}"
}
resource "aws_cloudwatch_event_target" "es_cleanup" {
target_id = "${var.prefix}lambda-es-cleanup"
rule = "${aws_cloudwatch_event_rule.schedule.name}"
arn = "${aws_lambda_function.es_cleanup.arn}"
}
resource "aws_lambda_permission" "allow_cloudwatch" {
statement_id = "AllowExecutionFromCloudWatch"
action = "lambda:InvokeFunction"
function_name = "${aws_lambda_function.es_cleanup.arn}"
principal = "events.amazonaws.com"
source_arn = "${aws_cloudwatch_event_rule.schedule.arn}"
}
#!/usr/bin/python
# -*- coding: utf-8 -*-
"""
This AWS Lambda function allowed to delete the old Elasticsearch index
"""
from botocore.auth import SigV4Auth
from botocore.awsrequest import AWSRequest
from botocore.credentials import create_credential_resolver
from botocore.session import get_session
from botocore.vendored.requests import Session
import sys
if sys.version_info[0] == 3:
from urllib.request import quote
else:
from urllib import quote
import datetime
import json
import time
import os
class ES_Exception(Exception):
"""Exception capturing status_code from Client Request"""
status_code = 0
payload = ""
def __init__(self, status_code, payload):
self.status_code = status_code
self.payload = payload
Exception.__init__(self,
"ES_Exception: status_code={}, payload={}".format(
status_code, payload))
class ES_Cleanup(object):
name = "lambda_es_cleanup"
def __init__(self, event, context):
"""Main Class init
Args:
event (dict): AWS Cloudwatch Scheduled Event
context (object): AWS running context
"""
self.report = []
self.event = event
self.context = context
self.cfg = {}
self.cfg["es_endpoint"] = os.environ.get("es_endpoint", None)
self.cfg["index"] = os.environ.get("index", "all").split(",")
self.cfg["delete_after"] = int(os.environ.get("delete_after", 15))
self.cfg["es_max_retry"] = int(os.environ.get("es_max_retry", 3))
self.cfg["index_format"] = os.environ.get("index_format", "%Y.%m.%d")
self.cfg["sns_alert"] = os.environ.get("sns_alert", "")
if not self.cfg["es_endpoint"]:
raise Exception("[es_endpoint] OS variable is not set")
def send_to_es(self, path, method="GET", payload={}):
"""Low-level POST data to Amazon Elasticsearch Service generating a Sigv4 signed request
Args:
path (str): path to send to ES
method (str, optional): HTTP method default:GET
payload (dict, optional): additional payload used during POST or PUT
Returns:
dict: json answer converted in dict
Raises:
#: Error during ES communication
ES_Exception: Description
"""
if not path.startswith("/"):
path = "/" + path
es_region = self.cfg["es_endpoint"].split(".")[1]
# send to ES with exponential backoff
retries = 0
while retries < int(self.cfg["es_max_retry"]):
if retries > 0:
seconds = (2**retries) * .1
# print('Waiting for %.1f seconds', seconds)
time.sleep(seconds)
req = AWSRequest(
method=method,
url="https://%s%s?pretty&format=json" % (self.cfg["es_endpoint"], quote(path)),
data=payload,
headers={'Host': self.cfg["es_endpoint"]})
credential_resolver = create_credential_resolver(get_session())
credentials = credential_resolver.load_credentials()
SigV4Auth(credentials, 'es', es_region).add_auth(req)
try:
preq = req.prepare()
session = Session()
res = session.send(preq)
if res.status_code >= 200 and res.status_code <= 299:
# print("%s %s" % (res.status_code, res.content))
return json.loads(res.content)
else:
raise ES_Exception(res.status_code, res._content)
except ES_Exception as e:
if (e.status_code >= 500) and (e.status_code <= 599):
retries += 1 # Candidate for retry
else:
raise # Stop retrying, re-raise exception
def send_error(self, msg):
"""Send SNS error
Args:
msg (str): error string
Returns:
None
"""
_msg = "[%s][%s] %s" % (self.name, self.cur_account, msg)
print(_msg)
if self.cfg["sns_alert"] != "":
sns_region = self.cfg["sns_alert"].split(":")[4]
sns = boto3.client("sns", region_name=sns_region)
response = sns.publish(
TopicArn=self.cfg["sns_alert"], Message=_msg)
def delete_index(self, index_name):
"""ES DELETE specific index
Args:
index_name (str): Index name
Returns:
dict: ES answer
"""
return self.send_to_es(index_name, "DELETE")
def get_indices(self):
"""ES Get indices
Returns:
dict: ES answer
"""
return self.send_to_es("/_cat/indices")
def lambda_handler(event, context):
"""Main Lambda function
Args:
event (dict): AWS Cloudwatch Scheduled Event
context (object): AWS running context
Returns:
None
"""
es = ES_Cleanup(event, context)
# Index cutoff definition, remove older than this date
earliest_to_keep = datetime.date.today() - datetime.timedelta(
days=int(es.cfg["delete_after"]))
for index in es.get_indices():
if index["index"] == ".kibana":
# ignore .kibana index
continue
idx_name = '-'.join(word for word in index["index"].split("-")[:-1])
idx_date = index["index"].split("-")[-1]
if idx_name in es.cfg["index"] or "all" in es.cfg["index"]:
if idx_date <= earliest_to_keep.strftime(es.cfg["index_format"]):
print("Deleting index: %s" % index["index"])
es.delete_index(index["index"])
if __name__ == '__main__':
event = {
'account': '123456789012',
'region': 'eu-west-1',
'detail': {},
'detail-type': 'Scheduled Event',
'source': 'aws.events',
'time': '1970-01-01T00:00:00Z',
'id': 'cdc73f9d-aea9-11e3-9d5a-835b769c0d9c',
'resources':
['arn:aws:events:us-east-1:123456789012:rule/my-schedule']
}
lambda_handler(event, "")
{
"Version": "2012-10-17",
"Statement": [
{
"Sid": "LambdaLogCreation",
"Effect": "Allow",
"Action": ["logs:*"],
"Resource": "arn:aws:logs:*:*:*"
},
{
"Sid": "ESPermission",
"Effect": "Allow",
"Action": [
"es:*"
],
"Resource": "*"
}
]
}
\ No newline at end of file
data "template_file" "policy" {
template = "${file("${path.module}/files/es_policy.json")}"
}
resource "aws_iam_policy" "policy" {
name = "${var.prefix}es-cleanup"
path = "/"
description = "Policy for es-cleanup Lambda function"
policy = "${data.template_file.policy.rendered}"
}
resource "aws_iam_role" "role" {
name = "${var.prefix}es-cleanup"
assume_role_policy = <<EOF
{
"Version": "2012-10-17",
"Statement": [
{
"Effect": "Allow",
"Principal": {
"Service": "lambda.amazonaws.com"
},
"Action": "sts:AssumeRole"
}
]
}
EOF
}
resource "aws_iam_role_policy_attachment" "policy_attachment" {
role = "${aws_iam_role.role.name}"
policy_arn = "${aws_iam_policy.policy.arn}"
}
data "archive_file" "es_cleanup_lambda" {
type = "zip"
source_file = "${path.module}/files/es-cleanup.py"
output_path = "${path.module}/es-cleanup.zip"
}
resource "aws_lambda_function" "es_cleanup" {
filename = "${path.module}/es-cleanup.zip"
function_name = "${var.prefix}es-cleanup"
timeout = 300
runtime = "python${var.python_version}"
role = "${aws_iam_role.role.arn}"
handler = "es-cleanup.lambda_handler"
source_code_hash = "${data.archive_file.es_cleanup_lambda.output_base64sha256}"
environment {
variables = {
es_endpoint = "${var.es_endpoint}"
index = "${var.index}"
delete_after = "${var.delete_after}"
index_format = "${var.index_format}"
sns_alert = "${var.sns_alert}"
}
}
}
variable "prefix" { default = "" }
variable "schedule" { default = "cron(0 3 * * ? *)" }
variable "sns_alert" { default = "" }
variable "es_endpoint" {}
variable "index" {}
variable "delete_after" { default = 7 }
variable "index_format" {}
variable "python_version" { default = "2.7" }
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment