Contents

机器学习-FATE联邦学习-教程

github FATE官方文档

fate 官网

fate 官网教程

fate 算法开发指南

算法参数

FATE组件官网

pipline官方文档

教程

部署

单机部署

官网安装页面

使用Docker镜像安装FATE

作者本地环境:ubuntu==18.04LTS,docker==20.10.17,FATE==1.7.3

拉取镜像

1
docker pull federatedai/standalone_fate:1.7.3

启动镜像

1
docker run -it --name standalone_fate -p 8080:8080 federatedai/standalone_fate:1.7.3

之后就可以通过docker exec登录FATE容器了,在.bashrc里面添加alias来简化登录,启动和关闭容器过程

1
2
3
alias fatebash="docker container exec -it 2fe0497f6846 /bin/bash"
alias fatestart="docker container start 2fe0497f6846"
alias fatestop="docker container stop 2fe0497f6846"

之后就可以直接通过fatebash登录容器

百度文库

另外可以使用vscode的docker插件和remote插件来更加简化登录启动和关闭容器(强烈推荐),另外这两个插件还提供了几乎所有的容器操作以及查看和修改容器文件的操作

登录容器后测试

1
2
3
4
5
6
source bin/init_env.sh
# Toy测试
flow test toy -gid 10000 -hid 10000 # success to calculate secure_sum, it is 2000.0

# 单元测试
fate_test unittest federatedml --yes # there are 0 failed test

文档

FATE设计

架构

逻辑架构
https://github.com/FederatedAI/FATE/raw/master/doc/architecture/images/fate_architecture_overview.png
overview
物理架构

主要使用eggroll物理架构

https://github.com/FederatedAI/FATE/raw/master/doc/architecture/images/fate_on_eggroll_architecture.png
eggroll

组件

FATE组件官网

  1. 联邦统计: 包括隐私交集计算,并集计算,皮尔逊系数, PSI等
  2. 联邦信息检索:基于OT的PIR(SIR)
  3. 联邦特征工程:包括联邦采样,联邦特征分箱,联邦特征选择等
  4. 联邦机器学习算法:包括横向和纵向的联邦LR, GBDT, DNN,迁移学习, 无监督学习,纵向半监督学习等
  5. 模型评估:提供对二分类,多分类,回归评估,聚类评估,联邦和单边对比评估
  6. 安全协议:提供了多种安全协议,以进行更安全的多方交互计算

pipline也可以作为组件被添加到新pipline。

算法参数

开发资源

开发指南

为 FATE 开发可运行的算法模块(py文件)

对于xxx算法模块

1. 定义此模块将使用的参数对象

参数对象是将用户定义的运行时参数传递给开发模块的唯一途径,每个模块都有其自己的参数对象

  • FATE/python/faderatedml/param目录下新建xxx_param.py,定义继承BaseParam的参数类
  • 在init方法调用基类的init方法(super(LogisticParam, self).__init__())并添加模块使用的所有参数
  • 重载BaseParam的check方法,检查参数是否可用,对于不可用的参数实例抛出ValueError("xxx")异常。未实现check方法的参数类会在使用时抛出异常
2. 定义新模块的meta文件

向FATE-Flow模块描述如何启动算法模块程序

  • FATE/python/federatedml/components/目录下新建xxx.py作为meta文件
  • 配置文件:
    • 传入xxx定义ComponentMeta实例:xxx_cpn_meta = ComponentMeta("XXX"),xxx为小写_分隔单词,XXX为dls中调用的模块名使用大驼峰,比如:hetero_lr_cpn_meta = ComponentMeta("HeteroLR")
    • 使用装饰器xxx_cpn_meta.bind_runner.on_$role绑定模块对象到每个角色,可以将多个角色绑定到一个模块对象:xxx_cpn_meta.bind_runner.on_$role1.on_$role2.on_$role3
    • 使用装饰器xxx_cpn_meta.bind_param将参数对象绑定到定义的组件,函数返回参数对象即可
3. 定义此模块的传递变量py文件并生成传递变量对象(可选)

不同参与方之间存在信息交互时

FATE/python/federatedml/transfer_variable/transfer_class中新建py文件,新建一个继承BaseTransferVariables类的XXXTransferVariable类,定义相应的变量,并为其赋予需要的传输权限

1
2
3
4
5
6
7
from federatedml.transfer_variable.base_transfer_variable import BaseTransferVariables

class HeteroLRTransferVariable(BaseTransferVariables):
    def __init__(self, flowid=0):
        super().__init__(flowid)
        self.batch_data_index = self._create_variable(name='batch_data_index', src=['guest'], dst=['host'])
        ...
  • name:变量名
  • src:交互信息从何处发出,应为 “guest”,“host”,“arbiter” 的列表
  • dst:交互信息发送到何处,应为 “guest”,“host”,“arbiter” 的列表
4. 定义算法模块(继承 model_base)

fate_flow_client 模块的运行规则

  1. 从数据库中检索fate的组件注册信息,获取component的每个role对应的运行对象
  2. 初始化各方的运行对象
  3. 调用运行对象的 run 方法
  4. 如果需要,调用 save_data 方法或 export_model 方法

继承python/federatedml/model_base.py中的ModelBase对象,init方法中指定一些模块信息并指定参数对象

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
def __init__(self):
  super().__init__()
  self.model_name = 'HeteroLogisticRegression'
  self.model_param_name = 'HeteroLogisticRegressionParam'
  self.model_meta_name = 'HeteroLogisticRegressionMeta'
  self.mode = consts.HETERO
  self.aggregator = None
  self.cipher = None
  self.batch_generator = None
  self.gradient_loss_operator = None
  self.converge_procedure = None
  self.model_param = HeteroLogisticParam()

需要时重载fit方法:def fit(self, train_data, validate_data=None):。fit方法是启动建模组件的训练,或者特征工程组件的fit功能的入口。接受train_data训练数据和validate验证集数据,验证集数据可不提供。该函数在用户启动训练任务时,被model_base自动调起

需要时重载 predict 方法:def predict(self, data_inst):。data_inst 是一个 Table, 用于建模组件的预测功能。在用户启动预测任务时,将被model_base自动调起,训练任务中,建模组件也会调用predict函数对训练数据和验证集数据(如果有)进行预测,并输出预测结果。后续希望接入evaluation,需要输出符合下列格式的Table:

  • 二分类,多分类,回归任务返回一张表 表的格式为: ["label", "predict_result", "predict_score", "predict_detail", "type"]
  • 聚类任务返回两张表,第一张的格式为: ["cluster_sample_count", "cluster_inner_dist", "inter_cluster_dist"],第二张表的格式为: ["predicted_cluster_index", "distance"]

需要时重载 transform 方法:def transform(self, data_inst):,data_inst 是一个 Table, 用于特征工程组件对数据进行转化功能。用户启动预测任务时,将被model_base自动调起

定义 save_data 方法:fate-flow 通过它获取输出数据。

1
2
def save_data(self):
    return self.data_output
5. 定义模型保存所需的protobuf

为了方便模型跨平台保存和读取模型,FATE使用protobuf文件定义每个模型所需的参数和模型内容。在FATE/python/federatedml/protobuf/proto/目录新建proto文件,python protobuf 教程

一个模型需要两个proto文件,后缀为meta的文件中保存某一次任务的配置,后缀为param的文件中保存某次任务的模型结果

定义完文件后执行该目录下的proto_generate.sh文件生成对应的py文件,之后可以在项目中引用proto类型

定义 export_model 方法:fate-flow 通过它获取输出的模型,模型同时包含 “Meta” 和 “Param” 产生的proto buffer类的 dict 格式,如

1
2
3
4
5
6
7
8
def export_model(self):
    meta_obj = self._get_meta()
    param_obj = self._get_param()
    result = {
        self.model_meta_name: meta_obj,
        self.model_param_name: param_obj
    }
    return result
6. 开发Pipeline组件

python/fate_client/pipeline/component中添加自己的组件,见pipline

7. 重启fate flow服务

重启fate flow从而让其发现新加的组件

1
python fate_flow_server.py --debug # debug进行代码热重载
开始建模任务
  • 第一步: 上传数据
  • 第二步: 开始建模任务
  • 第三步: 检查日志文件

FATE Pipeline

pipline官方文档

Pipeline 是一种高级 Python API,允许用户以顺序方式设计、启动和查询 FATE job作业。FATE Pipeline 设计为用户友好且与 FATE 命令行工具的行为一致。用户可以通过向管道添加组件来自定义工作流程,然后通过一次调用启动一个作业。

FATE 作业是有向无环图(dag)

FATE 作业是一个由算法组件节点组成的 dag。FATE 管道提供易于使用的工具来配置任务的顺序和设置。

FATE 以模块化风格编写。模块是具有输入和输出数据的模型。当一个模块的输出设置为另一个模块的输入时,两个模块被连接。 FATE 作业实际上是由一系列子模块组成的。

安装Pipeline

安装fate_client会自动安装pipline命令

1
pip install fate_client

使用pipline之前外面需要指定连接的FATE Flow Service,类似聚合服务器,在单机部署的Fate里面默认是127.0.0.1:9380。执行如下命令来指定连接

1
pipeline init --ip 127.0.0.1 --port 9380
pipline接口
Component组件

组件官方文档

FATE支持的模块清单

FATE 模块被包装在 component 组件中。每个组件都可以接收和输出Data 和 Model。组件的参数可以在初始化的时候设置,未指定的参数将采用默认值,所有组件都有一个 name 标识符,它在管道中必须是唯一的,建议每个组件名称都包含一个编号作为后缀,以便于跟踪。

  • Input输入:Input将所有上游输入封装到作业工作流中的组件。分为三类input:data、cache和 model。只有Intersection可能有cache输入
1
2
3
4
5
6
from pipeline.component import DataTransform

data_transform_0 = DataTransform(name="data_transform_0")
input_all = data_transform_0.input
input_data = data_transform_0.input.data
input_model = data_transform_0.input.model
  • Output输出:Output将组件的输出data、cache和 modelof 封装在一个 FATE 作业中。只有Intersection可能有cache输出
1
2
3
4
5
6
from pipeline.component import DataTransform

data_transform_0 = DataTransform(name="data_transform_0")
output_all = data_transform_0.output
output_data = data_transform_0.output.data
output_model = data_transform_0.output.model
  • Data数据:数据集在模块之间传递时被包装到data
1
pipeline.add_component(intersection_0, data=Data(data=data_transform_0.output.data))
Data Name Input Output Use Case
data Yes Yes 单一数据输入或输出
train_data Yes Yes 模型训练;DataSplit组件输出
validate_data Yes Yes 带验证集的模型训练;DataSplit组件输出
test_data No Yes DataSplit组件输出
predict_input Yes No 模型预测

组件的所有输入和输出数据在组件之间传递时都需要包装成Data对象

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
from pipeline.backend.pipeline import Pipeline
from pipeline.component import DataTransform, Intersection, HeteroDataSplit, HeteroLR
# initialize a pipeline
pipeline = PipeLine().set_initiator(role='guest', party_id=guest).set_roles(guest=guest)
# define all components
data_transform_0 = DataTransform(name="data_transform_0")
data_split = HeteroDataSplit(name="data_split_0")
hetero_lr_0 = HeteroLR(name="hetero_lr_0", max_iter=20)
# chain together all components
pipeline.add_component(reader_0)
pipeline.add_component(data_transform_0, data=Data(data=reader_0.output.data))
pipeline.add_component(intersection_0, data=Data(data=data_transform_0.output.data))
pipeline.add_component(hetero_data_split_0, data=Data(data=intersection_0.output.data))
pipeline.add_component(hetero_lr_0, data=Data(train_data=hetero_data_split_0.output.data.train_data,
                                              validate_data=hetero_data_split_0.output.data.test_data))
  • Model模型:分为model和isometric_model。当前组件与前一个组件属于同一类则是model,将复制前一个组件的所有模型参数。当前组件和前一个组件不是同一类时为isometric_model。

  • Cache缓存:Cache仅适用于Intersection组件。

将intersection_0的缓存输出设置为intersection_1的缓存输入

1
pipeline.add_component(intersection_1, data=Data(data=data_transform_0.output.data), cache=Cache(intersect_0.output.cache))

使用CacheLoader组件从另一个作业加载缓存

1
2
pipeline.add_component(cache_loader_0)
pipeline.add_component(intersect_0, data=Data(data=data_transform_0.output.data), cache=Cache(cache_loader_0.output.cache))
  • Parameter参数:定义组件时可以设置所有方的参数,也可以为某一方设置单独参数
1
2
3
4
5
6
7
from pipeline.component import DataTransform
# 定义组件所有方的默认参数
data_transform_0 = DataTransform(name="data_transform_0", input_format="dense", output_format="dense",
                  outlier_replace=False)
# 为某方设置单独的参数
guest_data_transform_0 = data_transform_0.get_party_instance(role='guest', party_id=9999)
guest_data_transform_0.component_param(with_label=True)
  • Task Info任务信息:

Components可以使用 Pipeline 任务信息 API 检索输出数据和模型信息。目前 Pipeline 支持这四种类型的组件查询:

  1. get_output_data:返回下载的输出数据;使用limits参数限制来限制输出行
  2. get_output_data_table:返回输出数据表信息(包括表名和命名空间)
  3. get_model_param:返回拟合模型参数
  4. get_summary:返回模型摘要
建立管道

管道演示

初始化管道后,应使用set_initiator和set_roles指定作业发起者和参与者(arbiter为协调者,在联邦中作为可信第三方)

1
2
3
pipeline = PipeLine()
pipeline.set_initiator(role='guest', party_id=9999)
pipeline.set_roles(guest=9999, host=10000, arbiter=10000)

Reader读取数据源

1
reader_0 = Reader(name="reader_0")

DataTransform将数据转换为DataInstance格式,然后可以将其用于数据工程和模型训,组件的get_party_instance方法为不同的角色设置单独配置

1
2
3
data_transform_0 = DataTransform(name="data_transform_0")
guest_component_instance = data_transform_0.get_party_instance(role='guest', party_id=9999)
guest_component_instance.component_param(with_label=True, output_format="dense")

管道实例的add_component方法添加组件实例

1
pipeline.add_component(data_transform_0, data=Data(data=reader_0.output.data))
设置运行作业的FATE版本

指定FATE的版本来提交任务,未指定,将使用默认版本

管道实例的set_global_job_provider方法设置全局版本

1
pipeline.set_global_job_provider("fate@1.7.0")

组件实例的provider属性指定组件版本

1
homo_nn.provider = "fate@1.7.0"
运行管道

添加所有组件后,先编译管道,然后运行作业

1
2
pipeline.compile()
pipeline.fit()
查询任务

FATE Pipeline 提供 API 来查询组件信息,包括数据、模型和摘要。所有查询 API 都具有与 FlowPy 匹配的名称,Pipeline 检索并将查询结果直接返回给用户

1
summary = pipeline.get_component("hetero_lr_0").get_summary()
部署组件

管道fit拟合(训练)完成,就可以在新数据集上运行预测。在进行预测之前,需要先部署必要的组件,可以部署选择的组件和全部组件(Reader组件只能被指定部署)

1
2
3
4
# 部署选择的部分组件
pipeline.deploy_component([data_transform_0, hetero_lr_0])
# 部署全部组件
pipeline.deploy_component()
用管道预测

启动一个新的管道,将训练后的管道作为组件添加到其中,然后指定用于预测的数据源。

1
2
3
4
predict_pipeline = PipeLine()
predict_pipeline.add_component(reader_0)
predict_pipeline.add_component(pipeline,
                               data=Data(predict_input={pipeline.data_transform_0.input.data: reader_0.output.data}))

然后可以在新管道上启动预测

1
predict_pipeline.predict()

可以在运行预测之前向原始管道添加新组件

1
predict_pipeline.add_component(evaluation_0, data=Data(data=pipeline.hetero_lr_0.output.data))

可以指定使用训练过程中保存的检查点模块进行预测

1
predict_pipeline.predict(components_checkpoint={"hetero_lr_0": {"step_index": 8}})
管道的保存和恢复

保存管道

1
pipeline.dump("pipeline_saved.pkl")

恢复管道

1
2
from pipeline.backend.pipeline import PineLine
PipeLine.load_model_from_file("pipeline_saved.pkl")
流水线概要信息

获取管道的详细信息

1
pipeline.describe()
在线预测服务(FATE-Serving)与管道一起使用

部署组件

加载模型

1
pipeline.online.load()

将模型绑定到所选服务,可以指定选择的 FATE-Serving 地址

1
2
pipeline.online.bind("service_1")
pipeline.online.bind("service_1", "127.0.0.1")
上传数据

PipeLine 提供了上传本地数据表的功能

管道与 CLI

过去的版本中,用户通过命令行界面与 FATE 交互,使用手动配置的 conf 和 dsl json 文件

Pipeline 在编译时自动形成任务配置文件

 |