将流数据加载到 Amazon Elasticsearch Service
可以从许多不同的源将流数据加载到 Amazon Elasticsearch Service 域。有些资源(如 Amazon Kinesis Data Firehose 和 Amazon CloudWatch Logs)具有针对 Amazon ES 的内置支持。其他资源(如 Amazon S3、Amazon Kinesis Data Streams 和 Amazon DynamoDB)使用 AWS Lambda 函数作为事件处理程序。Lambda 函数响应新数据的方式是处理数据并将其流式传输到域。
注意
Lambda 支持多种常用编程语言,并且在大多数 AWS 区域中都可用。有关更多信息,请参阅 AWS Lambda Developer Guide 中的构建 Lambda 函数和 AWS General Reference 中的 AWS Lambda 区域。
主题
从 Amazon S3 将流数据加载到 Amazon ES
可以使用 Lambda 从 Amazon S3 将数据发送到 Amazon ES 域。到达 S3 存储桶的新数据将触发事件通知到 Lambda,这将运行自定义代码以执行编制索引。
这种流式传输数据的方式极其灵活。可以为对象元数据编制索引,或者如果对象是纯文本,则对对象正文的部分元素进行解析和编制索引。此节包含一些简单的 Python 示例代码,这些代码使用正则表达式解析日志文件并为匹配项编制索引。
提示
有关 Node.js 中的更多可靠代码,请参阅 GitHub 上的 amazon-elasticsearch-lambda-samples。某些 Lambda 蓝图还包含有用的解析示例。
先决条件
继续操作之前,必须具有以下资源。
先决条件 | 描述 |
---|---|
Amazon S3 存储桶 | 有关更多信息,请参阅 Amazon Simple Storage Service 入门指南 中的创建存储桶。存储桶必须与 Amazon ES 域位于同一个区域。 |
Amazon ES 域 | Lambda 函数处理数据之后数据的目的地。有关更多信息,请参阅创建 Amazon ES 域。 |
创建 Lambda 部署程序包
部署程序包为 ZIP 或 JAR 文件,其中包含代码及其依赖项。此节包括 Python 示例代码。对于其他编程语言,请参阅 AWS Lambda Developer Guide 中的创建部署程序包。
创建目录。在此示例中,我们使用名称
s3-to-es
。在名为
sample.py
的目录中创建一个文件:import boto3import reimport requestsfrom requests_aws4auth import AWS4Auth region = '' # e.g. us-west-1service = 'es'credentials = boto3.Session().get_credentials() awsauth = AWS4Auth(credentials.access_key, credentials.secret_key, region, service, session_token=credentials.token) host = '' # the Amazon ES domain, including https://index = 'lambda-s3-index'type = 'lambda-type'url = host + '/' + index + '/' + type headers = { "Content-Type": "application/json" } s3 = boto3.client('s3')# Regular expressions used to parse some simple log linesip_pattern = re.compile('(\d+\.\d+\.\d+\.\d+)') time_pattern = re.compile('\[(\d+\/\w\w\w\/\d\d\d\d:\d\d:\d\d:\d\d\s-\d\d\d\d)\]') message_pattern = re.compile('\"(.+)\"')# Lambda execution starts heredef handler(event, context): for record in event['Records']: # Get the bucket name and key for the new file bucket = record['s3']['bucket']['name'] key = record['s3']['object']['key'] # Get, read, and split the file into lines obj = s3.get_object(Bucket=bucket, Key=key) body = obj['Body'].read() lines = body.splitlines() # Match the regular expressions to each line and index the JSON for line in lines: ip = ip_pattern.search(line).group(1) timestamp = time_pattern.search(line).group(1) message = message_pattern.search(line).group(1) document = { "ip": ip, "timestamp": timestamp, "message": message } r = requests.post(url, auth=awsauth, json=document, headers=headers)
编辑
region
和host
的变量。安装依赖项:
cd s3-to-es pip install requests -t . pip install requests_aws4auth -t .
所有 Lambda 执行环境都已安装 Boto3,因此无需将其包含在部署程序包中。
提示
如果使用 macOS,则这些命令可能无法正常工作。解决方法是将名为
setup.cfg
的文件添加到s3-to-es
目录:[install] prefix=
打包应用程序代码和依赖项:
zip -r lambda.zip *
创建 Lambda 函数
创建部署程序包之后,可以创建 Lambda 函数。创建函数时,选择名称、运行时 (例如,Python 2.7) 和 IAM 角色。IAM 角色定义对函数的权限。有关详细说明,请参阅 AWS Lambda Developer Guide 中的创建示例 Lambda 函数。
此示例假定使用的是控制台。选择 Python 2.7 和具有 S3 读取权限和 Amazon ES 写入权限的角色,如以下屏幕截图中所示。
在创建此函数后,必须添加一个触发器。在此示例中,我们希望代码在日志文件到达 S3 存储桶中时执行:
选择 S3。
选择存储桶。
对于 Event type (事件类型),选择 PUT。
对于 Prefix (前缀),键入
logs/
。对于 Filter pattern (筛选条件模式),键入
.log
。选择 Enable trigger (启用触发器)。
选择 Add。
最后,可以上传部署程序包:
对于 Handler (处理程序),键入
sample.handler
。此设置告知 Lambda 在触发之后应执行的文件 (sample.py
) 和方法 (handler
)。对于 Code entry type (代码输入种类),选择 Upload a .ZIP file (上传 .ZIP 文件),然后按照提示上传部署程序包。
选择 Save。
此时,您具有一整套资源:存储日志文件的存储桶、日志文件添加到存储桶时执行的函数、执行解析和编制索引的代码以及搜索和可视化的 Amazon ES 域。
测试 Lambda 函数
在创建此函数之后,可以通过将文件上传到 Amazon S3 存储桶来测试此函数。使用以下示例日志行创建一个名为 sample.log
的文件:
12.345.678.90 - [10/Oct/2000:13:55:36 -0700] "PUT /some-file.jpg" 12.345.678.91 - [10/Oct/2000:14:56:14 -0700] "GET /some-file.jpg"
将文件上传到 S3 存储桶的 logs
文件夹。有关说明,请参阅 Amazon Simple Storage Service 入门指南 中的向存储桶添加对象。
然后使用 Amazon ES 控制台或 Kibana 验证 lambda-s3-index
索引是否包含两个文档。还可以发出标准搜索请求:
GET https://es-domain/lambda-index/_search?pretty { "hits" : { "total" : 2, "max_score" : 1.0, "hits" : [ { "_index" : "lambda-s3-index", "_type" : "lambda-type", "_id" : "vTYXaWIBJWV_TTkEuSDg", "_score" : 1.0, "_source" : { "ip" : "12.345.678.91", "message" : "GET /some-file.jpg", "timestamp" : "10/Oct/2000:14:56:14 -0700" } }, { "_index" : "lambda-s3-index", "_type" : "lambda-type", "_id" : "vjYmaWIBJWV_TTkEuCAB", "_score" : 1.0, "_source" : { "ip" : "12.345.678.90", "message" : "PUT /some-file.jpg", "timestamp" : "10/Oct/2000:13:55:36 -0700" } } ] } }
从 Amazon Kinesis Data Streams 将流数据加载到 Amazon ES
您可以从 Kinesis Data Streams 将流数据加载到 Amazon ES。到达此数据流的新数据将向 Lambda 触发事件通知,这将运行自定义代码以执行索引编制。此节包括一些简单的 Python 示例代码。有关 Node.js 中的更多可靠代码,请参阅 GitHub 上的 amazon-elasticsearch-lambda-samples。
先决条件
继续操作之前,必须具有以下资源。
先决条件 | 说明 |
---|---|
Amazon Kinesis 数据流 | Lambda 函数的事件源。要了解更多信息,请参阅 Kinesis 数据流。 |
Amazon ES 域 | Lambda 函数处理数据之后数据的目的地。有关更多信息,请参阅创建 Amazon ES 域。 |
IAM 角色 | 此角色必须具有基本的 Amazon ES、Kinesis 和 Lambda 权限,如以下内容: { "Version": "2012-10-17", "Statement": [ { "Effect": "Allow", "Action": [ "es:ESHttpPost", "es:ESHttpPut", "logs:CreateLogGroup", "logs:CreateLogStream", "logs:PutLogEvents", "kinesis:GetShardIterator", "kinesis:GetRecords", "kinesis:DescribeStream", "kinesis:ListStreams" ], "Resource": "*" } ] } 角色必须拥有以下信任关系: { "Version": "2012-10-17", "Statement": [ { "Effect": "Allow", "Principal": { "Service": "lambda.amazonaws.com" }, "Action": "sts:AssumeRole" } ] } 要了解更多信息,请参阅 IAM 用户指南 中的创建 IAM 角色。 |
创建 Lambda 函数
按照创建 Lambda 部署程序包中的说明操作,但创建一个名为 kinesis-to-es
的目录并对 sample.py
使用以下代码:
import base64import boto3import jsonimport requestsfrom requests_aws4auth import AWS4Auth region = '' # e.g. us-west-1service = 'es'credentials = boto3.Session().get_credentials() awsauth = AWS4Auth(credentials.access_key, credentials.secret_key, region, service, session_token=credentials.token) host = '' # the Amazon ES domain, including https://index = 'lambda-kine-index'type = 'lambda-kine-type'url = host + '/' + index + '/' + type + '/'headers = { "Content-Type": "application/json" }def handler(event, context): count = 0 for record in event['Records']: id = record['eventID'] timestamp = record['kinesis']['approximateArrivalTimestamp'] # Kinesis data is base64-encoded, so decode here message = base64.b64decode(record['kinesis']['data']) # Create the JSON document document = { "id": id, "timestamp": timestamp, "message": message } # Index the document r = requests.put(url + id, auth=awsauth, json=document, headers=headers) count += 1 return 'Processed ' + str(count) + ' items.'
编辑 region
和 host
的变量。
使用以下命令安装依赖项:
cd kinesis-to-es pip install requests -t . pip install requests_aws4auth -t .
然后按照创建 Lambda 函数中的说明操作,但指定先决条件中的 IAM 角色和以下触发器设置:
Kinesis 流:您的 Kinesis 流
批处理大小:100
起始位置:时间范围
要了解更多信息,请参阅 Amazon Kinesis Data Streams 开发人员指南 中的使用 Amazon Kinesis 数据流。
此时,您具有一整套资源:Kinesis 数据流、在流收到新数据并为该数据编制索引后执行的函数、用于搜索和可视化的 Amazon ES 域。
测试 Lambda 函数
创建此函数后,可以通过使用 AWS CLI 将新记录添加到数据流来测试它:
aws kinesis put-record --stream-name es-test --data "My test data." --partition-key partitionKey1 --region us-west-1
然后使用 Amazon ES 控制台或 Kibana 验证 lambda-kine-index
是否包含文档。还可使用以下请求:
GET https://es-domain/lambda-kine-index/_search { "hits" : [ { "_index": "lambda-kine-index", "_type": "lambda-kine-type", "_id": "shardId-000000000000:49583511615762699495012960821421456686529436680496087042", "_score": 1, "_source": { "timestamp": 1523648740.051, "message": "My test data.", "id": "shardId-000000000000:49583511615762699495012960821421456686529436680496087042" } } ] }
从 Amazon DynamoDB 将流数据加载到 Amazon ES
可以使用 AWS Lambda 从 Amazon DynamoDB 将数据发送到 Amazon ES 域。到达数据库表的新数据将触发事件通知到 Lambda,这将运行自定义代码以执行编制索引。
先决条件
继续操作之前,必须具有以下资源。
先决条件 | 说明 |
---|---|
DynamoDB Table | 此表包含源数据。有关更多信息,请参阅 Amazon DynamoDB 开发人员指南 中的表的基本操作。 此表必须与 Amazon ES 域驻留在同一个区域并且流设置为新映像。要了解更多信息,请参阅启用流。 |
Amazon ES 域 | Lambda 函数处理数据之后数据的目的地。有关更多信息,请参阅创建 Amazon ES 域。 |
IAM 角色 | 此角色必须具有基本的 Amazon ES、DynamoDB 和 Lambda 执行权限,如以下内容: { "Version": "2012-10-17", "Statement": [ { "Effect": "Allow", "Action": [ "es:ESHttpPost", "es:ESHttpPut", "dynamodb:DescribeStream", "dynamodb:GetRecords", "dynamodb:GetShardIterator", "dynamodb:ListStreams", "logs:CreateLogGroup", "logs:CreateLogStream", "logs:PutLogEvents" ], "Resource": "*" } ] } 角色必须拥有以下信任关系: { "Version": "2012-10-17", "Statement": [ { "Effect": "Allow", "Principal": { "Service": "lambda.amazonaws.com" }, "Action": "sts:AssumeRole" } ] } 要了解更多信息,请参阅 IAM 用户指南 中的创建 IAM 角色。 |
创建 Lambda 函数
按照创建 Lambda 部署程序包中的说明操作,但创建一个名为 ddb-to-es
的目录并对 sample.py
使用以下代码:
import boto3import requestsfrom requests_aws4auth import AWS4Auth region = '' # e.g. us-east-1service = 'es'credentials = boto3.Session().get_credentials() awsauth = AWS4Auth(credentials.access_key, credentials.secret_key, region, service, session_token=credentials.token) host = '' # the Amazon ES domain, with https://index = 'lambda-index'type = 'lambda-type'url = host + '/' + index + '/' + type + '/'headers = { "Content-Type": "application/json" }def handler(event, context): count = 0 for record in event['Records']: # Get the primary key for use as the Elasticsearch ID id = record['dynamodb']['Keys']['id']['S'] if record['eventName'] == 'REMOVE': r = requests.delete(url + id, auth=awsauth) else: document = record['dynamodb']['NewImage'] r = requests.put(url + id, auth=awsauth, json=document, headers=headers) count += 1 return str(count) + ' records processed.'
编辑 region
和 host
的变量。
使用以下命令安装依赖项:
cd ddb-to-es pip install requests -t . pip install requests_aws4auth -t .
然后按照创建 Lambda 函数中的说明操作,但指定先决条件中的 IAM 角色和以下触发器设置:
表:您的 DynamoDB 表
批处理大小:100
起始位置:时间范围
要了解更多信息,请参阅 Amazon DynamoDB 开发人员指南 中的处理 DynamoDB 表中的新项目。
此时,您具有一整套资源:源数据的 DynamoDB 表、表更改的 DynamoDB 流、在源数据更改并为这些更改编制索引之后执行的函数以及用于搜索和可视化的 Amazon ES 域。
测试 Lambda 函数
创建此函数后,可以通过使用 AWS CLI 将新项目添加到 DynamoDB 表来测试它:
aws dynamodb put-item --table-name es-test --item '{"director": {"S": "Kevin Costner"},"id": {"S": "00001"},"title": {"S": "The Postman"}}' --region us-west-1
然后使用 Amazon ES 控制台或 Kibana 验证 lambda-index
是否包含文档。还可使用以下请求:
GET https://es-domain/lambda-index/lambda-type/00001 { "_index": "lambda-index", "_type": "lambda-type", "_id": "00001", "_version": 1, "found": true, "_source": { "director": { "S": "Kevin Costner" }, "id": { "S": "00001" }, "title": { "S": "The Postman" } } }
从 Amazon Kinesis Data Firehose 将流数据加载到 Amazon ES
Kinesis Data Firehose 支持 Amazon ES 作为传输目标。有关如何将流数据加载到 Amazon ES 的说明,请参阅 Amazon Kinesis Data Firehose 开发人员指南 中的创建 Kinesis 数据 Firehose 传输流和选择 Amazon ES 作为目标。
注意
Amazon Kinesis Data Firehose 目前不支持 VPC 域。
在将数据加载到 Amazon ES 之前,可能需要对数据执行转换。要了解有关使用 Lambda 函数执行此任务的更多信息,请参阅此同一指南中的数据转换。
配置传输流时,Kinesis Data Firehose 具有“一键式”IAM 角色,该角色将为传输流提供将数据发送到 Amazon ES、在 Amazon S3 上备份数据以及使用 Lambda 转换数据所需的资源访问权限。由于手动创建此类角色的过程非常复杂,我们建议使用提供的角色。
从 Amazon CloudWatch 将流数据加载到 Amazon ES
您可以使用 CloudWatch Logs 订阅从 CloudWatch Logs 将流数据加载到您的 Amazon ES 域。有关 Amazon CloudWatch 订阅的信息,请参阅使用订阅实时处理日志数据。有关配置信息,请参阅 Amazon CloudWatch 开发人员指南 中的将 CloudWatch Logs 数据流式传输到 Amazon Elasticsearch Service。
将 AWS IoT 中的数据加载到 Amazon ES
您可以使用规则从 AWS IoT 发送数据。要了解更多信息,请参阅 AWS IoT 开发人员指南 中的 Amazon ES 操作。
本文转载自:AWS
作者:AWS
原文链接:https://docs.aws.amazon.com/zh_cn/elasticsearch-service/latest/developerguide/es-aws-integrations.html
推荐阅读