I have a problem with Spring Integration and Spring Batch. I want to pass csv file from sftp to a batch job then convert information into POJO and pass to output. How can I do this? I have next configuration:
@Configuration
@RequiredArgsConstructor
@Setter
class BatchJobConfig {
private final JobBuilderFactory jobBuilderFactory;
private final StepBuilderFactory stepBuilderFactory;
private final DataSource dataSource;
private final PlatformTransactionManager transactionManager;
@Bean
public Job readCSVFilesJob() {
return jobBuilderFactory
.get("readCSVFilesJob")
.incrementer(new RunIdIncrementer())
.start(step())
.build();
}
@Bean
public Step step() {
return stepBuilderFactory.get("step").<Bill, Bill>chunk(7)
.reader(reader())
.writer(writer())
.build();
}
@Bean
@StepScope
public FlatFileItemReader<Bill> reader() {
FlatFileItemReader<Bill> reader = new FlatFileItemReader<>();
reader.setResource(new FileSystemResource("/info"));
reader.setLinesToSkip(1);
reader.setStrict(false);
reader.setLineMapper(new DefaultLineMapper<Bill>() {{
setLineTokenizer(new DelimitedLineTokenizer() {{
setNames("first-name", "last-name", "amount");
setDelimiter(";");
}
});
setFieldSetMapper(new RecordFieldSetMapper());
}
});
return reader;
}
@Bean
public ConsoleItemWriter<Bill> writer() {
return new ConsoleItemWriter<>();
}
@Bean
protected JobRepository createJobRepository() throws Exception {
JobRepositoryFactoryBean factory = new JobRepositoryFactoryBean();
factory.setDataSource(dataSource);
factory.setTransactionManager(transactionManager);
factory.setIsolationLevelForCreate("ISOLATION_SERIALIZABLE");
factory.setTablePrefix("BATCH_");
factory.setMaxVarCharLength(1000);
return factory.getObject();
}
}
Writer
public class ConsoleItemWriter<T> implements ItemWriter<T> {
@Override
public void write(List<? extends T> items) throws Exception {
for (T item : items) {
System.out.println(item);
}
}
}
Integration config
@Configuration
@RequiredArgsConstructor
public class SftpConfig {
private final Job job;
private final JobRepository jobRepository;
@Value("${sftp.host}")
private String sftpHost;
@Value("${sftp.user}")
private String sftpUser;
@Value("${sftp.password}")
private String sftpPassword;
@Value("${sftp.port}")
private int sftpPort;
@Value("${poller.trigger}")
private int pollerTrigger;
@Bean(name = PollerMetadata.DEFAULT_POLLER)
public PollerMetadata defaultPoller() {
PollerMetadata pollerMetadata = new PollerMetadata();
pollerMetadata.setTrigger(new PeriodicTrigger(pollerTrigger));
return pollerMetadata;
}
@Bean
SftpInboundFileSynchronizer sftpInboundFileSynchronizer() {
SftpInboundFileSynchronizer fileSynchronizer = new SftpInboundFileSynchronizer(sftpSessionFactory());
fileSynchronizer.setDeleteRemoteFiles(false);
fileSynchronizer.setRemoteDirectory("/info");
return fileSynchronizer;
}
@Bean
@InboundChannelAdapter("sftpChannel")
public MessageSource<File> sftpMessageSource() {
SftpInboundFileSynchronizingMessageSource source = new SftpInboundFileSynchronizingMessageSource(sftpInboundFileSynchronizer());
source.setLocalDirectory(new File("/tmp/info"));
source.setAutoCreateLocalDirectory(true);
return source;
}
@Bean
public SessionFactory<LsEntry> sftpSessionFactory() {
DefaultSftpSessionFactory factory = new DefaultSftpSessionFactory(true);
factory.setHost(sftpHost);
factory.setPort(sftpPort);
factory.setUser(sftpUser);
factory.setPassword(sftpPassword);
factory.setAllowUnknownKeys(true);
return new CachingSessionFactory<>(factory);
}
@Bean
@ServiceActivator(inputChannel = "sftpChannel")
public MessageHandler handler() {
return message -> System.out.println("transferred");
}
@Bean
public FileMessageToJobRequest fileMessageToJobRequest() {
FileMessageToJobRequest fileMessageToJobRequest = new FileMessageToJobRequest();
fileMessageToJobRequest.setFileParameterName("input.file.name");
fileMessageToJobRequest.setJob(job);
return fileMessageToJobRequest;
}
@Bean
public JobLaunchingGateway jobLaunchingGateway() {
SimpleJobLauncher simpleJobLauncher = new SimpleJobLauncher();
simpleJobLauncher.setJobRepository(jobRepository);
simpleJobLauncher.setTaskExecutor(new SyncTaskExecutor());
JobLaunchingGateway jobLaunchingGateway = new JobLaunchingGateway(simpleJobLauncher);
return jobLaunchingGateway;
}
@Bean
public IntegrationFlow integrationFlow() {
return IntegrationFlows.from(Files.inboundAdapter(new File("/info")).
filter(new SimplePatternFileListFilter("*.csv")),
c -> c.poller(Pollers.fixedRate(1000).maxMessagesPerPoll(1))).
handle(fileMessageToJobRequest()).
handle(jobLaunchingGateway()).
log(LoggingHandler.Level.WARN, "headers.id + ': ' + payload").
get();
}
}
Model
@Data
@AllArgsConstructor
@NoArgsConstructor
@Accessors(chain = true)
public class Bill {
private String firstName;
private String lastName;
private String amount;
}
Mapper
import org.springframework.batch.item.file.mapping.FieldSetMapper;
import org.springframework.batch.item.file.transform.FieldSet;
public class RecordFieldSetMapper implements FieldSetMapper<Bill> {
@Override
public Bill mapFieldSet(FieldSet fieldSet) {
return new Bill()
.setFirstName(fieldSet.readString("first-name"))
.setLastName(fieldSet.readString("last-name"));
}
}
Request
@Setter
public class FileMessageToJobRequest {
private Job job;
private String fileParameterName;
@Transformer
public JobLaunchRequest toJobLaunchRequest(Message<File> message) {
JobParametersBuilder jobParametersBuilder = new JobParametersBuilder();
jobParametersBuilder.addString(fileParameterName, message.getPayload().getAbsolutePath());
return new JobLaunchRequest(job, jobParametersBuilder.toJobParameters());
}
}
Individually everything works. But in output I dont see information about my records. IntegrationFlow cant call batch job.
FileMessageToJobRequest
the type defined in docs.spring.io/spring-batch/4.1.x/reference/html/… ?