面对海量且持续增加的各式各样的数据对象,你是否有信心知道哪些数据从哪里来以及它如何随时间而变化?采用Hadoop必须考虑数据管理的实际情况,元数据与数据治理成为企业级数据湖的重要部分。

为寻求数据治理的开源解决方案,Hortonworks 公司联合其他厂商与用户于2015年发起数据治理倡议,包括数据分类、集中策略引擎、数据血缘、安全和生命周期管理等方面。Apache Atlas 项目就是这个倡议的结果,社区伙伴持续的为该项目提供新的功能和特性。该项目用于管理共享元数据、数据分级、审计、安全性以及数据保护等方面,努力与Apache Ranger整合,用于数据权限控制策略。

Atlas 是一个可扩展和可扩展的核心基础治理服务集 - 使企业能够有效地和高效地满足 Hadoop 中的合规性要求,并允许与整个企业数据生态系统的集成。

Atlas 的组件可以分为以下主要类别:

Core

此类别包含实现 Atlas 功能核心的组件,包括:

Type System:Atlas 允许用户为他们想要管理的元数据对象定义一个模型。该模型由称为 “类型” 的定义组成。”类型” 的 实例被称为 “实体” 表示被管理的实际元数据对象。类型系统是一个组件,允许用户定义和管理类型和实体。由 Atlas 管理的所有元数据对象(例如Hive表)都使用类型进行建模,并表示为实体。要在 Atlas 中存储新类型的元数据,需要了解类型系统组件的概念。

需要注意的一个关键点是,Atlas 中建模的通用性质允许数据管理员和集成者定义技术元数据和业务元数据。也可以使用 Atlas 的特征来定义两者之间的丰富关系。

Ingest / Export:Ingest 组件允许将元数据添加到 Atlas。类似地,Export 组件暴露由 Atlas 检测到的元数据更改,以作为事件引发,消费者可以使用这些更改事件来实时响应元数据更改。

Graph Engine :在内部,Atlas 通过使用图形模型管理元数据对象。以实现元数据对象之间的巨大灵活性和丰富的关系。图形引擎是负责在类型系统的类型和实体之间进行转换的组件,以及基础图形模型。除了管理图形对象之外,图形引擎还为元数据对象创建适当的索引,以便有效地搜索它们。

Titan:目前,Atlas 使用 Titan 图数据库来存储元数据对象。 Titan 使用两个存储:默认情况下元数据存储配置为 HBase ,索引存储配置为 Solr。也可以通过构建相应的配置文件将元数据存储作为 BerkeleyDB 和 Index 存储使用为 ElasticSearch。元数据存储用于存储元数据对象本身,并且索引存储用于存储元数据属性的索引,其允许高效搜索。

Integration

用户可以使用两种方法管理 Atlas 中的元数据:

API:Atlas 的所有功能通过 REST API 提供给最终用户,允许创建,更新和删除类型和实体。它也是查询和发现通过 Atlas 管理的类型和实体的主要方法。

Messaging:除了 API 之外,用户还可以选择使用基于 Kafka 的消息接口与 Atlas 集成。这对于将元数据对象传输到 Atlas 以及从 Atlas 使用可以构建应用程序的元数据更改事件都非常有用。如果希望使用与 Atlas 更松散耦合的集成,这可以允许更好的可扩展性,可靠性等,消息传递接口是特别有用的。Atlas 使用 Apache Kafka 作为通知服务器用于钩子和元数据通知事件的下游消费者之间的通信。事件由钩子和 Atlas 写到不同的 Kafka 主题。

元数据源

Atlas 支持与许多元数据源的集成。将来还会添加更多集成。目前,Atlas 支持从以下来源获取和管理元数据:

与其它元数据源集成意味着两件事:有一些元数据模型,Atlas 定义本机来表示这些组件的对象。 Atlas 提供了从这些组件中通过实时或批处理模式获取元数据对象的组件。

Apps

由 Atlas 管理的元数据各种应用程序使用,满足许多治理用例。

Atlas Admin UI:该组件是一个基于 Web 的应用程序,允许数据管理员和科学家发现和注释元数据。这里最重要的是搜索界面和 SQL 样的查询语言,可以用来查询由 Atlas 管理的元数据类型和对象。管理 UI 使用 Atlas 的 REST API 来构建其功能。

Tag Based Policies:Apache Ranger 是针对 Hadoop 生态系统的高级安全管理解决方案,与各种 Hadoop 组件具有广泛的集成。通过与 Atlas 集成,Ranger 允许安全管理员定义元数据驱动的安全策略,以实现有效的治理。 Ranger 是由 Atlas 通知的元数据更改事件的消费者。

Business Taxonomy:从元数据源获取到 Atlas 的元数据对象主要是一种技术形式的元数据。为了增强可发现性和治理能力,Atlas 提供了一个业务分类界面,允许用户首先定义一组代表其业务域的业务术语,并将其与 Atlas 管理的元数据实体相关联。业务分类法是一种 Web 应用程序,目前是 Atlas Admin UI 的一部分,并且使用 REST API 与 Atlas 集成。

Type System

Overview

Atlas 允许用户为他们想要管理的元数据对象定义一个模型。该模型由称为 “类型” 的定义组成。被称为 “实体” 的 “类型” 实例表示被管理的实际元数据对象。类型系统是一个组件,允许用户定义和管理类型和实体。由 Atlas 管理的所有元数据对象(例如Hive表)都使用类型进行建模,并表示为实体。要在Atlas中存储新类型的元数据,需要了解类型系统组件的概念。

Types

Atlas中的 “类型” 定义了如何存储和访问特定类型的元数据对象。类型表示了所定义元数据对象的一个或多个属性集合。具有开发背景的用户可以将 “类型” 理解成面向对象的编程语言的 “类” 定义的或关系数据库的 “表模式”。

与 Atlas 本地定义的类型的示例是 Hive 表。 Hive 表用这些属性定义:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
Name: hive_table
MetaType: Class
SuperTypes: DataSet
Attributes:
name: String (name of the table)
db: Database object of type hive_db
owner: String
createTime: Date
lastAccessTime: Date
comment: String
retention: int
sd: Storage Description object of type hive_storagedesc
partitionKeys: Array of objects of type hive_column
aliases: Array of strings
columns: Array of objects of type hive_column
parameters: Map of String keys to String values
viewOriginalText: String
viewExpandedText: String
tableType: String
temporary: Boolean

从上面的例子可以注意到以下几点:

  • Atlas中的类型由 “name” 唯一标识,
  • 类型具有元类型。元类型表示 Atlas 中此模型的类型。 Atlas 有以下几种类型:
    • 基本元类型: Int,String,Boolean等。
    • 枚举元类型
    • 集合元类型:例如Array,Map
    • 复合元类型:Class,Struct,Trait
  • 类型可以从称为 “supertype” 的父类型 “extend” - 凭借这一点,它将包含在 “supertype” 中定义的属性。这允许模型在一组相关类型等之间定义公共属性。这再次类似于面向对象语言如何定义类的超类的概念。 Atlas 中的类型也可以从多个超类型扩展。
    • 在该示例中,每个 hive 表从预定义的超类型(称为 “DataSet”)扩展。稍后将提供关于此预定义类型的更多细节。
  • 具有 “Class”,”Struct” 或 “Trait” 的元类型的类型可以具有属性集合。每个属性都有一个名称(例如 “name”)和一些其他关联的属性。可以使用表达式 type_name.attribute_name 来引用属性。还要注意,属性本身是使用 Atlas 元类型定义的。
    • 在这个例子中,hive_table.name 是一个字符串,hive_table.aliases 是一个字符串数组,hive_table.db 引用一个类型的实例称为 hive_db 等等。
  • 在属性中键入引用(如hive_table.db)。使用这样的属性,我们可以在 Atlas 中定义的两种类型之间的任意关系,从而构建丰富的模型。注意,也可以收集一个引用列表作为属性类型(例如 hive_table.cols,它表示从 hive_table 到 hive_column 类型的引用列表)

Entities

Atlas中的 “实体” 是类 “类型” 的特定值或实例,因此表示真实世界中的特定元数据对象。 回顾我们的面向对象编程语言的类比,”实例” 是某个 “类” 的 “对象”。

实体的示例将是特定的 Hive 表。 说 “Hive” 在 “默认” 数据库中有一个名为 “customers” 的表。 此表将是类型为 hive_table 的 Atlas 中的 “实体”。 通过作为类类型的实例,它将具有作为 Hive 表 “类型” 的一部分的每个属性的值,例如:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
id: "9ba387dd-fa76-429c-b791-ffc338d3c91f"
typeName: “hive_table”
values:
name: "customers"
db: "b42c6cfc-c1e7-42fd-a9e6-890e0adf33bc"
owner: "admin"
createTime: "2016-06-20T06:13:28.000Z"
lastAccessTime: "2016-06-20T06:13:28.000Z"
comment: null
retention: 0
sd: "ff58025f-6854-4195-9f75-3a3058dd8dcf"
partitionKeys: null
aliases: null
columns: ["65e2204f-6a23-4130-934a-9679af6a211f", "d726de70-faca-46fb-9c99-cf04f6b579a6", ...]
parameters: {"transient_lastDdlTime": "1466403208"}
viewOriginalText: null
viewExpandedText: null
tableType: "MANAGED_TABLE"
temporary: false

从上面的例子可以注意到以下几点:

  • 作为 Class Type 实例的每个实体都由唯一标识符 GUID 标识。此 GUID 由 Atlas 服务器在定义对象时生成,并在实体的整个生命周期内保持不变。在任何时间点,可以使用其 GUID 来访问该特定实体。
    • 在本示例中,默认数据库中的 “customers” 表由GUID “9ba387dd-fa76-429c-b791-ffc338d3c91f” 唯一标识
  • 实体具有给定类型,并且类型的名称与实体定义一起提供。
    • 在这个例子中,”customers” 表是一个 “hive_table”。
  • 此实体的值是所有属性名称及其在 hive_table 类型定义中定义的属性的值的映射。
  • 属性值将根据属性的元类型。
    • 基本元类型:整数,字符串,布尔值。例如。 ‘name’=’customers’,’Temporary’=’false’
    • 集合元类型:包含元类型的值的数组或映射。例如。 parameters = {“transient_lastDdlTime”:”1466403208”}
    • 复合元类型:对于类,值将是与该特定实体具有关系的实体。例如。hive 表 “customers” 存在于称为 “default” 的数据库中。

表和数据库之间的关系通过 “db” 属性捕获。因此,”db” 属性的值将是一个唯一标识 hive_db 实体的 GUID,称为 “default”
对于实体的这个想法,我们现在可以看到 Class 和 Struct 元类型之间的区别。类和结构体都组成其他类型的属性。但是,类类型的实体具有 Id 属性(具有GUID值)并且可以从其他实体引用(如 hive_db 实体从 hive_table 实体引用)。 Struct 类型的实例没有自己的身份,Struct 类型的值是在实体本身内嵌入的属性的集合。

Attributes

我们已经看到属性在复合元类型(如 Class 和 Struct)中定义。 但是我们简单地将属性称为具有名称和元类型值。 然而, Atlas 中的属性还有一些属性,定义了与类型系统相关的更多概念。

属性具有以下属性:

1
2
3
4
5
6
7
name: string,
dataTypeName: string,
isComposite: boolean,
isIndexable: boolean,
isUnique: boolean,
multiplicity: enum,
reverseAttributeName: string

以上属性具有以下含义:

  • name - 属性的名称
  • dataTypeName - 属性的元类型名称(本机,集合或复合)
  • isComposite - 是否复合
    • 此标志指示建模的一个方面。如果一个属性被定义为复合,它意味着它不能有一个生命周期与它所包含的实体无关。这个概念的一个很好的例子是构成 hive 表一部分的一组列。由于列在 hive 表之外没有意义,它们被定义为组合属性。
    • 必须在 Atlas 中创建复合属性及其所包含的实体。即,必须与 hive 表一起创建 hive 列。
  • isIndexable - 是否索引
    • 此标志指示此属性是否应该索引,以便可以使用属性值作为谓词来执行查找,并且可以有效地执行查找。
  • isUnique - 是否唯一
    • 此标志再次与索引相关。如果指定为唯一,这意味着为 Titan 中的此属性创建一个特殊索引,允许基于等式的查找。
    • 具有此标志的真实值的任何属性都被视为主键,以将此实体与其他实体区分开。因此,应注意确保此属性在现实世界中模拟独特的属性。
      • 例如,考虑 hive_table 的 name 属性。孤立地,名称不是 hive_table 的唯一属性,因为具有相同名称的表可以存在于多个数据库中。如果 Atlas 在多个集群中存储 hive 表的元数据,即使一对(数据库名称,表名称)也不是唯一的。只有集群位置,数据库名称和表名称可以在物理世界中被视为唯一。
  • multiplicity - 指示此属性是(必需的/可选的/还是可以是多值)的。如果实体的属性值的定义与类型定义中的多重性声明不匹配,则这将是一个约束违反,并且实体添加将失败。因此,该字段可以用于定义元数据信息上的一些约束。

使用上面的内容,让我们扩展下面的 hive 表的属性之一的属性定义。让我们看看称为 “db” 的属性,它表示 hive 表所属的数据库:

1
2
3
4
5
6
7
8
db:
"dataTypeName": "hive_db",
"isComposite": false,
"isIndexable": true,
"isUnique": false,
"multiplicity": "required",
"name": "db",
"reverseAttributeName": null

注意多重性的 “multiplicity” = “required” 约束。 如果没有 db 引用,则不能发送表实体。

1
2
3
4
5
6
7
8
columns:
"dataTypeName": "array<hive_column>",
"isComposite": true,
"isIndexable": true,
“isUnique": false,
"multiplicity": "optional",
"name": "columns",
"reverseAttributeName": null

请注意列的 “isComposite” = true 值。通过这样做,我们指示定义的列实体应该始终绑定到它们定义的表实体。

从这个描述和示例中,您将能够意识到属性定义可以用于影响 Atlas 系统要执行的特定建模行为(约束,索引等)。

系统特定类型及其意义

Atlas 提供了一些预定义的系统类型。我们在前面的章节中看到了一个例子(DataSet)。在本节中,我们将看到所有这些类型并了解它们的意义。

Referenceable:此类型表示可使用名为 qualifiedName 的唯一属性搜索的所有实体。

Asset:此类型包含名称,说明和所有者等属性。名称是必需属性(multiplicity = required),其他是可选的。可引用和资源的目的是为定型器提供在定义和查询其自身类型的实体时强制一致性的方法。拥有这些固定的属性集允许应用程序和用户界面基于约定基于默认情况下他们可以期望的属性的假设。

Infrastructure:此类型扩展了可引用和资产,通常可用于基础设施元数据对象(如群集,主机等)的常用超类型。

DataSet:此类型扩展了可引用和资产。在概念上,它可以用于表示存储数据的类型。在 Atlas 中,hive表,Sqoop RDBMS表等都是从 DataSet 扩展的类型。扩展 DataSet 的类型可以期望具有模式,它们将具有定义该数据集的属性的属性。例如, hive_table 中的 columns 属性。另外,扩展 DataSet 的实体类型的实体参与数据转换,这种转换可以由 Atlas 通过 lineage(或 provenance)生成图形。

Process:此类型扩展了可引用和资产。在概念上,它可以用于表示任何数据变换操作。例如,将原始数据的 hive 表转换为存储某个聚合的另一个 hive 表的 ETL 过程可以是扩展过程类型的特定类型。流程类型有两个特定的属性,输入和输出。输入和输出都是 DataSet 实体的数组。因此,Process 类型的实例可以使用这些输入和输出来捕获 DataSet 的 lineage 如何演变。

Atlas 支持以下 2 种方式搜索元数据:

  • Search using DSL
  • Full-text search

Hive Atlas Bridge

Hive Model

默认 hive 建模在 org.apache.atlas.hive.model.HiveDataModelGenerator 中可用。 它定义以下类型:

1
2
3
4
5
6
7
8
hive_db(ClassType) - super types [Referenceable] - attributes [name, clusterName, description, locationUri, parameters, ownerName, ownerType]
hive_storagedesc(ClassType) - super types [Referenceable] - attributes [cols, location, inputFormat, outputFormat, compressed, numBuckets, serdeInfo, bucketCols, sortCols, parameters, storedAsSubDirectories]
hive_column(ClassType) - super types [Referenceable] - attributes [name, type, comment, table]
hive_table(ClassType) - super types [DataSet] - attributes [name, db, owner, createTime, lastAccessTime, comment, retention, sd, partitionKeys, columns, aliases, parameters, viewOriginalText, viewExpandedText, tableType, temporary]
hive_process(ClassType) - super types [Process] - attributes [name, startTime, endTime, userName, operationType, queryText, queryPlan, queryId]
hive_principal_type(EnumType) - values [USER, ROLE, GROUP]
hive_order(StructType) - attributes [col, order]
hive_serde(StructType) - attributes [name, serializationLib, parameters]

使用唯一的限定名称创建和去重复实体。它们提供命名空间,也可以用于 query/lineage。请注意,dbName,tableName 和 columnName 应为小写。 clusterName 解释如下。

  • hive_db - attribute qualifiedName - @
  • hive_table - attribute qualifiedName - .@
  • hive_column - attribute qualifiedName - ..@
  • hive_process - attribute name - - 小写的修剪查询字符串

导入 Hive Metadata

org.apache.atlas.hive.bridge.HiveMetaStoreBridge 使用 org.apache.atlas.hive.model.HiveDataModelGenerator 中定义的模型将 Hive 元数据导入 Atlas。 import-hive.sh 命令可以用来方便这一点。脚本需要 Hadoop 和 Hive 类路径 jar。 对于 Hadoop jar,请确保环境变量 HADOOP_CLASSPATH 已设置。另一种方法是将 HADOOP_HOME 设置为指向 Hadoop 安装的根目录同样,对于 Hive jar,将 HIVE_HOME 设置为 Hive 安装的根目录将环境变量 HIVE_CONF_DIR 设置为 Hive 配置目录复制 ${atlas-conf}/atlas-application.properties 到 hive conf 目录

1
Usage: <atlas package>/hook-bin/import-hive.sh

日志位于 ${atlas package}/logs/import-hive.log

如果要在 kerberized 集群中导入元数据,则需要运行以下命令:

1
<atlas package>/hook-bin/import-hive.sh -Dsun.security.jgss.debug=true -Djavax.security.auth.useSubjectCredsOnly=false -Djava.security.krb5.conf=[krb5.conf location] -Djava.security.auth.login.config=[jaas.conf location]

Hive Hook

Hive 在使用 hive hook 的 hive 命令执行上支持侦听器。 这用于在 Atlas 中使用 org.apache.atlas.hive.model.HiveDataModelGenerator 中定义的模型添加/更新/删除实体。 hive hook 将请求提交给线程池执行器,以避免阻塞命令执行。 线程将实体作为消息提交给通知服务器,并且服务器读取这些消息并注册实体。 按照 hive 设置中的这些说明为 Atlas 添加 hive hook :

  • Set-up atlas hook in hive-site.xml of your hive configuration:
1
2
3
4
<property>
<name>hive.exec.post.hooks</name>
<value>org.apache.atlas.hive.hook.HiveHook</value>
</property>
1
2
3
4
<property>
<name>atlas.cluster.name</name>
<value>primary</value>
</property>
  • Add ‘export HIVE_AUX_JARS_PATH=/hook/hive’ in hive-env.sh of your hive configuration
  • Copy /atlas-application.properties to the hive conf directory.

/atlas-application.properties中的以下属性控制线程池和通知详细信息:

  • atlas.hook.hive.synchronous - boolean,true来同步运行钩子。 默认false。 建议设置为false,以避免 hive 查询完成中的延迟。
  • atlas.hook.hive.numRetries - 通知失败的重试次数。 默认值 3
  • atlas.hook.hive.minThreads - 核心线程数。 默认值 5
  • atlas.hook.hive.maxThreads - 最大线程数。 默认值 5
  • atlas.hook.hive.keepAliveTime - 保持活动时间以毫秒为单位。 默认 10
  • atlas.hook.hive.queueSize - 线程池的队列大小。 默认 10000

参考 Configuration 通知相关配置

Column Level Lineage

从 atlas-0.8-incubating 版本开始,在 Atlas 中捕获列 lineage

Model

  • ColumnLineageProcess 类型是 Process 的子类
  • 这将输出列与一组输入列或输入表相关联
  • Lineage 还捕获 Dependency 的类型:当前的值是 SIMPLE,EXPRESSION,SCRIPT
    • SIMPLE依赖: 意味着输出列具有与输入相同的值
    • EXPRESSION依赖: 意味着输出列被输入列上的运行时中的一些表达式(例如Hive SQL表达式)转换。
    • SCRIPT依赖: 表示输出列由用户提供的脚本转换。
  • EXPRESSION 依赖的情况下,表达式属性包含字符串形式的表达式
  • 由于 Process 链接输入和输出 DataSet,我们使 Column 成为 DataSet 的子类

Examples

对于下面的简单 CTAS:

1
create table t2 as select id, name from T1

lineage 为

Extracting Lineage from Hive commands

  • HiveHook 将 HookContext 中的 LineageInfo 映射到 Column lineage 实例

  • Hive 中的 LineageInfo 为最终的 FileSinkOperator 提供 Column lineage ,将它们链接到 Hive 查询中的输入列

NOTE

在将 HIVE-13112 的补丁应用于 Hive 源之后,列级别 lineage 与 Hive 版本1.2.1配合使用

Limitations

  • 由于数据库名,表名和列名在 hive 中不区分大小写,因此实体中的对应名称为小写。 因此,任何搜索 API 都应该在查询实体名称时使用小写
  • 以下 hive 操作由 hive hook 当前捕获
    • create database
    • create table/view, create table as select
    • load, import, export
    • DMLs (insert)
    • alter database
    • alter table (skewed table information, stored as, protection is not supported)
    • alter view

Sqoop Atlas Bridge

Sqoop Model

默认的 Sqoop 建模在 org.apache.atlas.sqoop.model.SqoopDataModelGenerator 中可用。 它定义以下类型:

1
2
3
4
sqoop_operation_type(EnumType) - values [IMPORT, EXPORT, EVAL]
sqoop_dbstore_usage(EnumType) - values [TABLE, QUERY, PROCEDURE, OTHER]
sqoop_process(ClassType) - super types [Process] - attributes [name, operation, dbStore, hiveTable, commandlineOpts, startTime, endTime, userName]
sqoop_dbdatastore(ClassType) - super types [DataSet] - attributes [name, dbStoreType, storeUse, storeUri, source, description, ownerName]

使用唯一的限定名称创建和去重复实体。 它们提供命名空间,也可以用于查询:sqoop_process - attribute name - sqoop-dbStoreType-storeUri-endTime sqoop_dbdatastore - attribute name - dbStoreType-connectorUrl-source

Sqoop Hook

Sqoop 添加了一个 SqoopJobDataPublisher,在完成导入作业后将数据发布到 Atlas。 现在 sqoopHook 只支持hiveImport。 这用于使用 org.apache.atlas.sqoop.model.SqoopDataModelGenerator 中定义的模型在 Atlas 中添加实体。 按照 sqoop 设置中的以下说明在 ${sqoop-conf}/sqoop-site.xml 中为 Atlas 添加 sqoop 钩子:

  • Sqoop Job publisher class. Currently only one publishing class is supported

sqoop.job.data.publish.class org.apache.atlas.sqoop.hook.SqoopHook

  • Atlas cluster name

atlas.cluster.name

  • 复制 ${atlas-conf}/atlas-application.properties 到 sqoop 的配置文件夹 ${sqoop-conf}/
  • Link ${atlas-home}/hook/sqoop/*.jar in sqoop libRefer Configuration for notification related configurations

Limitations

  • 目前 sqoop hook 只支持 hiveImport 这一种 sqoop 操作

Falcon Atlas Bridge

Falcon Model

默认的falcon建模在 org.apache.atlas.falcon.model.FalconDataModelGenerator. 它可以定义以下类型:

1
2
3
4
5
falcon_cluster(ClassType) - super types [Infrastructure] - attributes [timestamp, colo, owner, tags]
falcon_feed(ClassType) - super types [DataSet] - attributes [timestamp, stored-in, owner, groups, tags]
falcon_feed_creation(ClassType) - super types [Process] - attributes [timestamp, stored-in, owner]
falcon_feed_replication(ClassType) - super types [Process] - attributes [timestamp, owner]
falcon_process(ClassType) - super types [Process] - attributes [timestamp, runs-on, owner, tags, pipelines, workflow-properties]

为 falcon 进程定义的每个集群创建一个 falcon_process 实体。

使用唯一的 qualifiedName 属性创建和去重复实体。 它们提供命名空间,也可以用于查询/沿袭。 唯一的属性是:

  • falcon_process - @
  • falcon_cluster -
  • falcon_feed - @
  • falcon_feed_creation -
  • falcon_feed_replication -

Falcon Hook

Falcon 支持在 falcon 实体提交上的侦听器。 这用于在 Atlas 中使用 org.apache.atlas.falcon.model.FalconDataModelGenerator 中定义的模型添加实体。 hook 将请求提交给线程池执行器,以避免阻塞命令执行。 线程将实体作为消息提交给通知服务器,并且服务器读取这些消息并注册实体。

  • Add ‘org.apache.atlas.falcon.service.AtlasService’ to application.services in ${falcon-conf}/startup.properties
  • Link falcon hook jars in falcon classpath - ‘ln -s ${atlas-home}/hook/falcon/* ${falcon-home}/server/webapp/falcon/WEB-INF/lib/‘
  • In ${falcon_conf}/falcon-env.sh, set an environment variable as follows:
1
export FALCON_SERVER_OPTS="<atlas_home>/hook/falcon/*:$FALCON_SERVER_OPTS"

The following properties in ${atlas-conf}/atlas-application.properties control the thread pool and notification details:

  • atlas.hook.falcon.synchronous - boolean, true to run the hook synchronously. default false
  • atlas.hook.falcon.numRetries - number of retries for notification failure. default 3
  • atlas.hook.falcon.minThreads - core number of threads. default 5
  • atlas.hook.falcon.maxThreads - maximum number of threads. default 5
  • atlas.hook.falcon.keepAliveTime - keep alive time in msecs. default 10
  • atlas.hook.falcon.queueSize - queue size for the threadpool. default 10000

Refer Configuration for notification related configurations

Limitations

  • 在 falcon 集群实体中,使用的集群名称应该跨诸如 hive,falcon,sqoop 等组件是统一的。如果与 ambari 一起使用,则应该使用 ambari 集群名称用于集群实体

Storm Atlas Bridge

Introduction

Apache Storm 是一个分布式实时计算系统。 Storm 使得容易可靠地处理无界的数据流,为实时处理 Hadoop 对批处理所做的工作。 该过程实质上是节点的 DAG,其被称为 topology

Apache Atlas 是一个元数据存储库,支持端到端数据沿袭,搜索和关联业务分类。

这种集成的目的是推动操作 topology 元数据以及基础数据源,目标,推导过程和任何可用的业务上下文,以便 Atlas 可以捕获此 topology 的 lineage。

在此过程中有2个部分详述如下:

  • Data model to represent the concepts in Storm
  • Storm Atlas Hook to update metadata in Atlas

Storm Data Model

数据模型在 Atlas 中表示为 Types。 它包含 topology 图中各种节点的描述,例如 spouts 和 bolts 以及相应的生产者和消费者类型。

在Atlas中添加以下类型。

  • storm_topology - 表示粗粒度拓扑。storm_topology 来自于 Atlas 过程类型,因此可用于通知 Atlas 关于 lineage。
  • 添加以下数据集 - kafka_topic,jms_topic,hbase_table,hdfs_data_set。 这些都来自Atlas Dataset类型,因此形成谱系图的端点。
  • storm_spout - 具有输出的数据生产者,通常为Kafka,JMS
  • storm_bolt - 具有输入和输出的数据使用者,通常为Hive,HBase,HDFS等。

Storm Atlas hook自动注册依赖模型,如Hive数据模型,如果它发现这些是不为Atlas服务器所知。

每个类型的数据模型在类定义org.apache.atlas.storm.model.StormDataModel中描述。

Storm Atlas Hook

当在 Storm 中成功注册新 topology 时,通知 Atlas。 Storm 在 Storm 客户端提供了一个钩子,backtype.storm.ISubmitterHook,用于提交一个 Storm topology。

Storm Atlas hook 拦截 hook 后执行,并从 topology 中提取元数据,并使用定义的类型更新 Atlas。 Atlas 在org.apache.atlas.storm.hook.StormAtlasHook 中实现了 Storm 客户端 hook 接口。

Limitations

以下内容适用于集成的第一个版本。

  • 只有新的 topology 提交已注册到 Atlas,任何生命周期变化都不会反映在 Atlas 中。
  • 当为要捕获的元数据提交 Storm topology 时,Atlas 服务器需要在线。
  • hook 目前不支持捕获自定义 spouts 和 bolts 的 lineage。

Installation

Storm Atlas Hook 需要在客户端手动安装在 Storm 在:$ATLAS_PACKAGE/hook/storm

Storm Atlas Hook 需要复制到 $STORM_HOME/extlib。 使用 storm 安装路径替换 STORM_HOME。

在将安装了 atlas hook 到 Storm 后重新启动所有守护进程。

Configuration

Storm Configuration

Storm Atlas Hook 需要在 Storm 客户端 $STORM_HOME/conf/storm.yaml 进行配置:

1
storm.topology.submission.notifier.plugin.class: "org.apache.atlas.storm.hook.StormAtlasHook"

还设置一个 “集群名称”,将用作在 Atlas 中注册的对象的命名空间。 此名称将用于命名 Storm topology,spouts 和 bolts。

其他对象(如 Dataset)应该理想地用生成它们的组件的集群名称来标识。 例如, Hive 表和数据库应该使用在 Hive 中设置的集群名称来标识。 如果 Hive 配置在客户端上提交的 Storm topology jar 中可用,并且在那里定义了集群名称,Storm Atlas hook 将选择此选项。 对于 HBase 数据集,这种情况类似。 如果此配置不可用,将使用在 Storm 配置中设置的集群名称。

1
atlas.cluster.name: "cluster_name"

$STORM_HOME/conf/storm_env.ini 中, 设置以下环境变量:

1
STORM_JAR_JVM_OPTS:"-Datlas.conf=$ATLAS_HOME/conf/"

将 ATLAS_HOME 指向 ATLAS 的安装目录.

你也可以通过程序对 Storm 进行如下配置:

1
2
3
4
Config stormConf = new Config();
...
stormConf.put(Config.STORM_TOPOLOGY_SUBMISSION_NOTIFIER_PLUGIN,
org.apache.atlas.storm.hook.StormAtlasHook.class.getName());