使用Apache Nifi的过程中,我们所使用的最多的是NIFI中各式各样的processor,当现有processor无法满足业务需求时,我们需要自行开发自定义processor,本文介绍我们在自定义processor时用到的方法
onTrigger
我们自定义Processor时最常用的是继承AbstractProcessor,首先看一下AbstractProcessor的继承关系:
public abstract class AbstractProcessor extends AbstractSessionFactoryProcessor {
// 控制器是先调用的这个onTrigger方法,然后再调用用户自定义实现的(下面的)onTrigger
@Override
public final void onTrigger(final ProcessContext context, final ProcessSessionFactory sessionFactory) throws ProcessException {
final ProcessSession session = sessionFactory.createSession();
try {
onTrigger(context, session);
session.commit();
} catch (final Throwable t) {
session.rollback(true);
throw t;
}
}
//这个onTrigger方法就是我们最常见的 需要去实现功能逻辑的了
public abstract void onTrigger(final ProcessContext context, final ProcessSession session) throws ProcessException;
}
UI画布上创建一个Processor的过程
我们先看在画布上拖拽出一个新的Processor会发什么。
在浏览器中创建一个Processor我们可以看到调用的API是 源码(部分省略)ProcessGroupResource.java如下:
/**
* 创建一个新的processor.
*
* @param httpServletRequest request
* @param groupId The group id
* @param requestProcessorEntity A processorEntity.
* @return A processorEntity.
*/
@POST
...
public Response createProcessor(
@Context final HttpServletRequest httpServletRequest,
@ApiParam(
value = "The process group id.",
required = true
)
@PathParam("id") final String groupId,
@ApiParam(
value = "The processor configuration details.",
required = true
) final ProcessorEntity requestProcessorEntity) {
final ProcessorDTO requestProcessor = requestProcessorEntity.getComponent();
...省略校验逻辑
//先为Processor生成一个坐标
final PositionDTO proposedPosition = requestProcessor.getPosition();
...省略坐标校验逻辑
...省略groupId校验逻辑
requestProcessor.setParentGroupId(groupId);
//集群下 复制操作请求
if (isReplicateRequest()) {
return replicate(HttpMethod.POST, requestProcessorEntity);
} else if (isDisconnectedFromCluster()) {
//集群断开的校验处理
verifyDisconnectedNodeModification(requestProcessorEntity.isDisconnectedNodeAcknowledged());
}
//通过serviceFacade服务执行操作。
return withWriteLock(
serviceFacade,//nifi-web-api-context.xml配置文件中的StandardNiFiServiceFacade,注入到ProcessGroupResource中
requestProcessorEntity,//POST请求传入的数据
lookup -> {//在serviceFacade.authorizeAccess()中使用,验证当前用户创建组件的权限
final NiFiUser user = NiFiUserUtils.getNiFiUser();
final Authorizable processGroup = lookup.getProcessGroup(groupId).getAuthorizable();
processGroup.authorize(authorizer, RequestAction.WRITE, user);
ComponentAuthorizable authorizable = null;
try {
authorizable = lookup.getConfigurableComponent(requestProcessor.getType(), requestProcessor.getBundle());
if (authorizable.isRestricted()) {
authorizeRestrictions(authorizer, authorizable);
}
final ProcessorConfigDTO config = requestProcessor.getConfig();
if (config != null && config.getProperties() != null) {
AuthorizeControllerServiceReference.authorizeControllerServiceReferences(config.getProperties(), authorizable, authorizer, lookup);
}
} finally {
if (authorizable != null) {
authorizable.cleanUpResources();
}
}
},
() -> serviceFacade.verifyCreateProcessor(requestProcessor),
processorEntity -> {
final ProcessorDTO processor = processorEntity.getComponent();
// 设置ID
processor.setId(generateUuid());
// 创建新的processor
final Revision revision = getRevision(processorEntity, processor.getId());
final ProcessorEntity entity = serviceFacade.createProcessor(revision, groupId, processor);
processorResource.populateRemainingProcessorEntityContent(entity);
// 返回201
String uri = entity.getUri();
return generateCreatedResponse(URI.create(uri), entity).build();
}
);
}
核心代码是serviceFacade.createProcessor(revision, groupId, processor);
onScheduled
它的意思是指示在计划运行组件时调用此方法。它将在任何对“onTrigger”的调用之前被调用,并且将在计划运行组件时被调用一次。
onScheduled方法一般都是被注解@onScheduled标注了的,没有这样的需求的组件可以不用写。
举个例子GenerateFlowFile组件中的onScheduled
@OnScheduled
public void onScheduled(final ProcessContext context) {
//在调用onTrigger方法之前先判断UNIQUE_FLOWFILES配置是否为true,如果为true就设置data为null
if (context.getProperty(UNIQUE_FLOWFILES).asBoolean()) {
this.data.set(null);
} else if(!context.getProperty(CUSTOM_TEXT).isSet()) {
this.data.set(generateData(context));
}
}
setup
里面通常放一些配置类代码,比如每次调度对state的操作
customValidate
这个也简单说一下,我们在编写Processor properties(supported和dynamic properties)的时候,会为properties也添加一些属性,比如说默认值、是否必须、校验器等等。架构提供了默认的validate方法去校验组件的这些properties是否合规,如果用户需要在常规检验外还需要自己特殊的校验,就要在Processor覆盖实现customValidate方法了,当然customValidate只有在常规检验成功后才会被调用