searchusermenu
  • 发布文章
  • 消息中心
点赞
收藏
评论
分享
原创

Apache NIFI,Processor所涉及的一些方法

2023-05-29 07:23:19
39
0

使用Apache Nifi的过程中,我们所使用的最多的是NIFI中各式各样的processor,当现有processor无法满足业务需求时,我们需要自行开发自定义processor,本文介绍我们在自定义processor时用到的方法

onTrigger

        我们自定义Processor时最常用的是继承AbstractProcessor,首先看一下AbstractProcessor的继承关系: 

public abstract class AbstractProcessor extends AbstractSessionFactoryProcessor {

    // 控制器是先调用的这个onTrigger方法,然后再调用用户自定义实现的(下面的)onTrigger
    @Override
    public final void onTrigger(final ProcessContext context, final ProcessSessionFactory sessionFactory) throws ProcessException {
        final ProcessSession session = sessionFactory.createSession();
        try {
            onTrigger(context, session);
            session.commit();
        } catch (final Throwable t) {
            session.rollback(true);
            throw t;
        }
    }
    //这个onTrigger方法就是我们最常见的 需要去实现功能逻辑的了
    public abstract void onTrigger(final ProcessContext context, final ProcessSession session) throws ProcessException;
}

UI画布上创建一个Processor的过程

        我们先看在画布上拖拽出一个新的Processor会发什么。

        在浏览器中创建一个Processor我们可以看到调用的API是  源码(部分省略)ProcessGroupResource.java如下:

/**
     * 创建一个新的processor.
     *
     * @param httpServletRequest request
     * @param groupId            The group id
     * @param requestProcessorEntity    A processorEntity.
     * @return A processorEntity.
     */
    @POST
    ...
    public Response createProcessor(
            @Context final HttpServletRequest httpServletRequest,
            @ApiParam(
                    value = "The process group id.",
                    required = true
            )
            @PathParam("id") final String groupId,
            @ApiParam(
                    value = "The processor configuration details.",
                    required = true
            ) final ProcessorEntity requestProcessorEntity) {
        final ProcessorDTO requestProcessor = requestProcessorEntity.getComponent();
        ...省略校验逻辑
        //先为Processor生成一个坐标
        final PositionDTO proposedPosition = requestProcessor.getPosition();
        ...省略坐标校验逻辑
        ...省略groupId校验逻辑
        requestProcessor.setParentGroupId(groupId);
        //集群下 复制操作请求
        if (isReplicateRequest()) {
            return replicate(HttpMethod.POST, requestProcessorEntity);
        } else if (isDisconnectedFromCluster()) {
            //集群断开的校验处理
            verifyDisconnectedNodeModification(requestProcessorEntity.isDisconnectedNodeAcknowledged());
        }
        //通过serviceFacade服务执行操作。
        return withWriteLock(
                serviceFacade,//nifi-web-api-context.xml配置文件中的StandardNiFiServiceFacade,注入到ProcessGroupResource中
                requestProcessorEntity,//POST请求传入的数据
                lookup -> {//在serviceFacade.authorizeAccess()中使用,验证当前用户创建组件的权限
                    final NiFiUser user = NiFiUserUtils.getNiFiUser();
                    final Authorizable processGroup = lookup.getProcessGroup(groupId).getAuthorizable();
                    processGroup.authorize(authorizer, RequestAction.WRITE, user);
                    ComponentAuthorizable authorizable = null;
                    try {
                        authorizable = lookup.getConfigurableComponent(requestProcessor.getType(), requestProcessor.getBundle());
                        if (authorizable.isRestricted()) {
                            authorizeRestrictions(authorizer, authorizable);
                        }
                        final ProcessorConfigDTO config = requestProcessor.getConfig();
                        if (config != null && config.getProperties() != null) {
                            AuthorizeControllerServiceReference.authorizeControllerServiceReferences(config.getProperties(), authorizable, authorizer, lookup);
                        }
                    } finally {
                        if (authorizable != null) {
                            authorizable.cleanUpResources();
                        }
                    }
                },
                () -> serviceFacade.verifyCreateProcessor(requestProcessor),
                processorEntity -> {
                    final ProcessorDTO processor = processorEntity.getComponent();
                    // 设置ID
                    processor.setId(generateUuid());
                    // 创建新的processor
                    final Revision revision = getRevision(processorEntity, processor.getId());
                    final ProcessorEntity entity = serviceFacade.createProcessor(revision, groupId, processor);
                    processorResource.populateRemainingProcessorEntityContent(entity);
                    // 返回201
                    String uri = entity.getUri();
                    return generateCreatedResponse(URI.create(uri), entity).build();
                }
        );
    }

核心代码是serviceFacade.createProcessor(revision, groupId, processor);

onScheduled

它的意思是指示在计划运行组件时调用此方法。它将在任何对“onTrigger”的调用之前被调用,并且将在计划运行组件时被调用一次。

onScheduled方法一般都是被注解@onScheduled标注了的,没有这样的需求的组件可以不用写。

举个例子GenerateFlowFile组件中的onScheduled

    @OnScheduled
    public void onScheduled(final ProcessContext context) {
        //在调用onTrigger方法之前先判断UNIQUE_FLOWFILES配置是否为true,如果为true就设置data为null
        if (context.getProperty(UNIQUE_FLOWFILES).asBoolean()) {
            this.data.set(null);
        } else if(!context.getProperty(CUSTOM_TEXT).isSet()) {
            this.data.set(generateData(context));
        }
    }


setup

里面通常放一些配置类代码,比如每次调度对state的操作

customValidate

这个也简单说一下,我们在编写Processor properties(supported和dynamic properties)的时候,会为properties也添加一些属性,比如说默认值、是否必须、校验器等等。架构提供了默认的validate方法去校验组件的这些properties是否合规,如果用户需要在常规检验外还需要自己特殊的校验,就要在Processor覆盖实现customValidate方法了,当然customValidate只有在常规检验成功后才会被调用

0条评论
作者已关闭评论
Ashley3
1文章数
0粉丝数
Ashley3
1 文章 | 0 粉丝
Ashley3
1文章数
0粉丝数
Ashley3
1 文章 | 0 粉丝
原创

Apache NIFI,Processor所涉及的一些方法

2023-05-29 07:23:19
39
0

使用Apache Nifi的过程中,我们所使用的最多的是NIFI中各式各样的processor,当现有processor无法满足业务需求时,我们需要自行开发自定义processor,本文介绍我们在自定义processor时用到的方法

onTrigger

        我们自定义Processor时最常用的是继承AbstractProcessor,首先看一下AbstractProcessor的继承关系: 

public abstract class AbstractProcessor extends AbstractSessionFactoryProcessor {

    // 控制器是先调用的这个onTrigger方法,然后再调用用户自定义实现的(下面的)onTrigger
    @Override
    public final void onTrigger(final ProcessContext context, final ProcessSessionFactory sessionFactory) throws ProcessException {
        final ProcessSession session = sessionFactory.createSession();
        try {
            onTrigger(context, session);
            session.commit();
        } catch (final Throwable t) {
            session.rollback(true);
            throw t;
        }
    }
    //这个onTrigger方法就是我们最常见的 需要去实现功能逻辑的了
    public abstract void onTrigger(final ProcessContext context, final ProcessSession session) throws ProcessException;
}

UI画布上创建一个Processor的过程

        我们先看在画布上拖拽出一个新的Processor会发什么。

        在浏览器中创建一个Processor我们可以看到调用的API是  源码(部分省略)ProcessGroupResource.java如下:

/**
     * 创建一个新的processor.
     *
     * @param httpServletRequest request
     * @param groupId            The group id
     * @param requestProcessorEntity    A processorEntity.
     * @return A processorEntity.
     */
    @POST
    ...
    public Response createProcessor(
            @Context final HttpServletRequest httpServletRequest,
            @ApiParam(
                    value = "The process group id.",
                    required = true
            )
            @PathParam("id") final String groupId,
            @ApiParam(
                    value = "The processor configuration details.",
                    required = true
            ) final ProcessorEntity requestProcessorEntity) {
        final ProcessorDTO requestProcessor = requestProcessorEntity.getComponent();
        ...省略校验逻辑
        //先为Processor生成一个坐标
        final PositionDTO proposedPosition = requestProcessor.getPosition();
        ...省略坐标校验逻辑
        ...省略groupId校验逻辑
        requestProcessor.setParentGroupId(groupId);
        //集群下 复制操作请求
        if (isReplicateRequest()) {
            return replicate(HttpMethod.POST, requestProcessorEntity);
        } else if (isDisconnectedFromCluster()) {
            //集群断开的校验处理
            verifyDisconnectedNodeModification(requestProcessorEntity.isDisconnectedNodeAcknowledged());
        }
        //通过serviceFacade服务执行操作。
        return withWriteLock(
                serviceFacade,//nifi-web-api-context.xml配置文件中的StandardNiFiServiceFacade,注入到ProcessGroupResource中
                requestProcessorEntity,//POST请求传入的数据
                lookup -> {//在serviceFacade.authorizeAccess()中使用,验证当前用户创建组件的权限
                    final NiFiUser user = NiFiUserUtils.getNiFiUser();
                    final Authorizable processGroup = lookup.getProcessGroup(groupId).getAuthorizable();
                    processGroup.authorize(authorizer, RequestAction.WRITE, user);
                    ComponentAuthorizable authorizable = null;
                    try {
                        authorizable = lookup.getConfigurableComponent(requestProcessor.getType(), requestProcessor.getBundle());
                        if (authorizable.isRestricted()) {
                            authorizeRestrictions(authorizer, authorizable);
                        }
                        final ProcessorConfigDTO config = requestProcessor.getConfig();
                        if (config != null && config.getProperties() != null) {
                            AuthorizeControllerServiceReference.authorizeControllerServiceReferences(config.getProperties(), authorizable, authorizer, lookup);
                        }
                    } finally {
                        if (authorizable != null) {
                            authorizable.cleanUpResources();
                        }
                    }
                },
                () -> serviceFacade.verifyCreateProcessor(requestProcessor),
                processorEntity -> {
                    final ProcessorDTO processor = processorEntity.getComponent();
                    // 设置ID
                    processor.setId(generateUuid());
                    // 创建新的processor
                    final Revision revision = getRevision(processorEntity, processor.getId());
                    final ProcessorEntity entity = serviceFacade.createProcessor(revision, groupId, processor);
                    processorResource.populateRemainingProcessorEntityContent(entity);
                    // 返回201
                    String uri = entity.getUri();
                    return generateCreatedResponse(URI.create(uri), entity).build();
                }
        );
    }

核心代码是serviceFacade.createProcessor(revision, groupId, processor);

onScheduled

它的意思是指示在计划运行组件时调用此方法。它将在任何对“onTrigger”的调用之前被调用,并且将在计划运行组件时被调用一次。

onScheduled方法一般都是被注解@onScheduled标注了的,没有这样的需求的组件可以不用写。

举个例子GenerateFlowFile组件中的onScheduled

    @OnScheduled
    public void onScheduled(final ProcessContext context) {
        //在调用onTrigger方法之前先判断UNIQUE_FLOWFILES配置是否为true,如果为true就设置data为null
        if (context.getProperty(UNIQUE_FLOWFILES).asBoolean()) {
            this.data.set(null);
        } else if(!context.getProperty(CUSTOM_TEXT).isSet()) {
            this.data.set(generateData(context));
        }
    }


setup

里面通常放一些配置类代码,比如每次调度对state的操作

customValidate

这个也简单说一下,我们在编写Processor properties(supported和dynamic properties)的时候,会为properties也添加一些属性,比如说默认值、是否必须、校验器等等。架构提供了默认的validate方法去校验组件的这些properties是否合规,如果用户需要在常规检验外还需要自己特殊的校验,就要在Processor覆盖实现customValidate方法了,当然customValidate只有在常规检验成功后才会被调用

文章来自个人专栏
文章 | 订阅
0条评论
作者已关闭评论
作者已关闭评论
0
0