0

I have a problem with Spring Integration and Spring Batch. I want to pass csv file from sftp to a batch job then convert information into POJO and pass to output. How can I do this? I have next configuration:

@Configuration
@RequiredArgsConstructor
@Setter
class BatchJobConfig {
    private final JobBuilderFactory jobBuilderFactory;
    private final StepBuilderFactory stepBuilderFactory;
    private final DataSource dataSource;
    private final PlatformTransactionManager transactionManager;

    @Bean
    public Job readCSVFilesJob() {
        return jobBuilderFactory
                .get("readCSVFilesJob")
                .incrementer(new RunIdIncrementer())
                .start(step())
                .build();
    }

    @Bean
    public Step step() {
        return stepBuilderFactory.get("step").<Bill, Bill>chunk(7)
                .reader(reader())
                .writer(writer())
                .build();
    }

    @Bean
    @StepScope
    public FlatFileItemReader<Bill> reader() {
        FlatFileItemReader<Bill> reader = new FlatFileItemReader<>();
        reader.setResource(new FileSystemResource("/info"));
        reader.setLinesToSkip(1);
        reader.setStrict(false);
        reader.setLineMapper(new DefaultLineMapper<Bill>() {{
                setLineTokenizer(new DelimitedLineTokenizer() {{
                        setNames("first-name", "last-name", "amount");
                        setDelimiter(";");
                    }
                });

                setFieldSetMapper(new RecordFieldSetMapper());
            }
        });
        return reader;
    }

    @Bean
    public ConsoleItemWriter<Bill> writer() {
        return new ConsoleItemWriter<>();
    }

    @Bean
    protected JobRepository createJobRepository() throws Exception {
        JobRepositoryFactoryBean factory = new JobRepositoryFactoryBean();
        factory.setDataSource(dataSource);
        factory.setTransactionManager(transactionManager);
        factory.setIsolationLevelForCreate("ISOLATION_SERIALIZABLE");
        factory.setTablePrefix("BATCH_");
        factory.setMaxVarCharLength(1000);
        return factory.getObject();
    }
}

Writer

public class ConsoleItemWriter<T> implements ItemWriter<T> {
    @Override
    public void write(List<? extends T> items) throws Exception {
        for (T item : items) {
            System.out.println(item);
        }
    }
}

Integration config

@Configuration
@RequiredArgsConstructor
public class SftpConfig {
    private final Job job;
    private final JobRepository jobRepository;

    @Value("${sftp.host}")
    private String sftpHost;

    @Value("${sftp.user}")
    private String sftpUser;

    @Value("${sftp.password}")
    private String sftpPassword;

    @Value("${sftp.port}")
    private int sftpPort;

    @Value("${poller.trigger}")
    private int pollerTrigger;

    @Bean(name = PollerMetadata.DEFAULT_POLLER)
    public PollerMetadata defaultPoller() {
        PollerMetadata pollerMetadata = new PollerMetadata();
        pollerMetadata.setTrigger(new PeriodicTrigger(pollerTrigger));
        return pollerMetadata;
    }

    @Bean
    SftpInboundFileSynchronizer sftpInboundFileSynchronizer() {
        SftpInboundFileSynchronizer fileSynchronizer = new SftpInboundFileSynchronizer(sftpSessionFactory());
        fileSynchronizer.setDeleteRemoteFiles(false);
        fileSynchronizer.setRemoteDirectory("/info");
        return fileSynchronizer;
    }

    @Bean
    @InboundChannelAdapter("sftpChannel")
    public MessageSource<File> sftpMessageSource() {
        SftpInboundFileSynchronizingMessageSource source = new SftpInboundFileSynchronizingMessageSource(sftpInboundFileSynchronizer());
        source.setLocalDirectory(new File("/tmp/info"));
        source.setAutoCreateLocalDirectory(true);
        return source;
    }

    @Bean
    public SessionFactory<LsEntry> sftpSessionFactory() {
        DefaultSftpSessionFactory factory = new DefaultSftpSessionFactory(true);
        factory.setHost(sftpHost);
        factory.setPort(sftpPort);
        factory.setUser(sftpUser);
        factory.setPassword(sftpPassword);
        factory.setAllowUnknownKeys(true);
        return new CachingSessionFactory<>(factory);
    }

    @Bean
    @ServiceActivator(inputChannel = "sftpChannel")
    public MessageHandler handler() {
        return message -> System.out.println("transferred");
    }

    @Bean
    public FileMessageToJobRequest fileMessageToJobRequest() {
        FileMessageToJobRequest fileMessageToJobRequest = new FileMessageToJobRequest();
        fileMessageToJobRequest.setFileParameterName("input.file.name");
        fileMessageToJobRequest.setJob(job);
        return fileMessageToJobRequest;
    }

    @Bean
    public JobLaunchingGateway jobLaunchingGateway() {
        SimpleJobLauncher simpleJobLauncher = new SimpleJobLauncher();
        simpleJobLauncher.setJobRepository(jobRepository);
        simpleJobLauncher.setTaskExecutor(new SyncTaskExecutor());
        JobLaunchingGateway jobLaunchingGateway = new JobLaunchingGateway(simpleJobLauncher);

        return jobLaunchingGateway;
    }

    @Bean
    public IntegrationFlow integrationFlow() {
        return IntegrationFlows.from(Files.inboundAdapter(new File("/info")).
                        filter(new SimplePatternFileListFilter("*.csv")),
                c -> c.poller(Pollers.fixedRate(1000).maxMessagesPerPoll(1))).
                handle(fileMessageToJobRequest()).
                handle(jobLaunchingGateway()).
                log(LoggingHandler.Level.WARN, "headers.id + ': ' + payload").
                get();
    }
}

Model

@Data
@AllArgsConstructor
@NoArgsConstructor
@Accessors(chain = true)
public class Bill {
    private String firstName;
    private String lastName;
    private String amount;
}

Mapper

import org.springframework.batch.item.file.mapping.FieldSetMapper;
import org.springframework.batch.item.file.transform.FieldSet;

public class RecordFieldSetMapper implements FieldSetMapper<Bill> {

    @Override
    public Bill mapFieldSet(FieldSet fieldSet) {

        return new Bill()
                .setFirstName(fieldSet.readString("first-name"))
                .setLastName(fieldSet.readString("last-name"));
    }
}

Request

@Setter
public class FileMessageToJobRequest {
    private Job job;
    private String fileParameterName;

    @Transformer
    public JobLaunchRequest toJobLaunchRequest(Message<File> message) {
        JobParametersBuilder jobParametersBuilder = new JobParametersBuilder();
        jobParametersBuilder.addString(fileParameterName, message.getPayload().getAbsolutePath());

        return new JobLaunchRequest(job, jobParametersBuilder.toJobParameters());
    }
}

Individually everything works. But in output I dont see information about my records. IntegrationFlow cant call batch job.

6
  • Is FileMessageToJobRequest the type defined in docs.spring.io/spring-batch/4.1.x/reference/html/… ? Commented Nov 2, 2018 at 15:17
  • Yes, of course, forgot to add. I have updated my question.
    – Andrew
    Commented Nov 2, 2018 at 15:21
  • ok thanks. By just looking at the code, I don't see what could go wrong. Do you have a MVCE project I can use to reproduce the issue? Otherwise I will have to create it to debug the problem. Commented Nov 2, 2018 at 21:12
  • Unfortunately, there is no access to the project now. In fact, this is all the code. All that is needed is to create a project and find a sftp server where to put the csv file with the specified fields. And you can try to reproduce the problem.
    – Andrew
    Commented Nov 2, 2018 at 21:37
  • ok no worries, I will create the sample myself. I can probably use a FileInboundChannelAdapter to monitor a directory in the file system without needing an sftp server. I guess the result will be the same. I'll be back when I have an answer. Commented Nov 2, 2018 at 21:42

0

Your Answer

By clicking “Post Your Answer”, you agree to our terms of service and acknowledge you have read our privacy policy.