0

Question regarding our Spring Boot 2.3.8.RELEASE implementation of @Transactional. The requirement is to implement distributed transactions that writes to an instance of PostgreSQL and ActiveMQ Artemis queues. If one commit fails, then so should the other. We are using Atomikos for our JTA Transaction Manager.

I think I have implemented everything I need, but clearly not. When I throw an Exception in my service code to test the rollback functionality, it clearly does not work: The message is written to ActiveMQ Artemis even after I throw an exception in the service code.

Any help with diagnosing and fixing would be much appreciated. If any additional details are required, please let me know.

Please find the details of the implementation below:

Spring Boot Application:

import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.CommandLineRunner;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.boot.autoconfigure.jdbc.DataSourceAutoConfiguration;
import org.springframework.boot.autoconfigure.jdbc.DataSourceTransactionManagerAutoConfiguration;
import org.springframework.boot.autoconfigure.jms.JmsAutoConfiguration;
import org.springframework.boot.autoconfigure.jms.activemq.ActiveMQAutoConfiguration;
import org.springframework.boot.autoconfigure.jms.artemis.ArtemisAutoConfiguration;
import org.springframework.boot.autoconfigure.orm.jpa.HibernateJpaAutoConfiguration;
import org.springframework.context.ConfigurableApplicationContext;

import xxx.xxx.Users;
import xxx.xxx.TransactionServiceImpl;

@SpringBootApplication
(
        exclude = {
                    DataSourceAutoConfiguration.class,
                    HibernateJpaAutoConfiguration.class,
                    DataSourceTransactionManagerAutoConfiguration.class,
                    JmsAutoConfiguration.class,
                    ActiveMQAutoConfiguration.class,
                    ArtemisAutoConfiguration.class
        }
)
public class BApplication implements CommandLineRunner
{
    public static void main(String[] args) throws Exception
    {
        //  SpringApplication.run(BoilerplateApplication.class, args);
        ConfigurableApplicationContext ctx = SpringApplication.run(BApplication.class, args);
        System.in.read();
        ctx.close();
    }
    
    @Autowired
    TransactionServiceImpl tsi;

    @Override
    public void run(String... args) throws Exception
    {
        Users user = new Users();
        user.setFirstName("Moe");
        user.setGender("M");
        user.setLastName("Moe");
        tsi.save(user);
    }
}

Here is the JTA Configuration:

JTA Configuration

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import javax.jms.ConnectionFactory;
import javax.jms.JMSException;
import javax.sql.DataSource;
import javax.transaction.SystemException;
import javax.transaction.UserTransaction;

import java.util.Properties;

import javax.annotation.PostConstruct;

import org.springframework.context.annotation.Bean;
import org.springframework.orm.jpa.JpaVendorAdapter;
import org.springframework.context.annotation.Primary;
import org.springframework.context.annotation.DependsOn;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Configuration;
import org.springframework.orm.jpa.vendor.HibernateJpaVendorAdapter;

import com.atomikos.icatch.config.UserTransactionService;
import com.atomikos.icatch.config.UserTransactionServiceImp;
import com.atomikos.icatch.jta.UserTransactionImp;
import com.atomikos.icatch.jta.UserTransactionManager;
import com.atomikos.jdbc.AtomikosDataSourceBean;
import com.atomikos.jms.AtomikosConnectionFactoryBean;

import org.apache.activemq.artemis.jms.client.ActiveMQConnectionFactory;
import org.postgresql.xa.PGXADataSource;

@Configuration("jtaConfig")
public class JtaConfig
{
    private static final Logger appLogger = LoggerFactory.getLogger(JtaConfig.class);

    @Value("${amq.broker.url}")
    private String brokerUrl;

    @Value("${amq.broker.username}")
    private String brokerUsername;

    @Value("${amq.broker.password}")
    private String brokerPassword;
    
    @Value("${postgresql.datasource.url}")
    String dataSourceUrl;

    @Value("${postgresql.datasource.username}")
    String dsUsername;

    @Value("${postgresql.datasource.password}")
    String dsPassword;

    @Value("${postgresql.datasource.driver.classname}")
    String dsClassName;
    
    @Value("${postgresql.initial.connections}")
    int initialDSConnections;
    
    @Value("${postgresql.max.connections}")
    int maxDSConnections;
    
    @Bean(initMethod = "init", destroyMethod = "shutdownForce")
    public UserTransactionService userTransactionService()
    {
        Properties atProps = new Properties();
        atProps.put("com.atomikos.icatch.service", "com.atomikos.icatch.standalone.UserTransactionServiceFactory");
        return new UserTransactionServiceImp(atProps);
    }

    @Bean (initMethod = "init", destroyMethod = "close")
    @DependsOn("userTransactionService")
    public UserTransactionManager atomikosTransactionManager()
    {
        UserTransactionManager utm = new UserTransactionManager();
        utm.setStartupTransactionService(false);
        utm.setForceShutdown(true);
        return utm;
    }

    @Bean
    @DependsOn("userTransactionService")
    public UserTransaction userTransaction()
    {
        UserTransactionImp ut = new UserTransactionImp();
        try
        {
            ut.setTransactionTimeout(1000);
        }
        catch (SystemException _e)
        {
            appLogger.error("Configuration exception.", _e);
            return null;
        }
        return ut;
    }
    
    @Bean
    public Properties hibernateProperties()
    {
        Properties hibernateProp = new Properties();
        hibernateProp.put("hibernate.dialect", "org.hibernate.dialect.PostgreSQLDialect");
        hibernateProp.put("hibernate.hbm2ddl.auto", "create-drop");
        hibernateProp.put("hibernate.show_sql", true);
        hibernateProp.put("hibernate.max_fetch_depth", 3);
        hibernateProp.put("hibernate.jdbc.batch_size", 10);
        hibernateProp.put("hibernate.jdbc.fetch_size", 50);
        return hibernateProp;
    }

    @Bean
    public JpaVendorAdapter jpaVendorAdapter()
    {
        return new HibernateJpaVendorAdapter();
    } 
    
    @Primary
    @Bean(name = "pgDataSource1", initMethod = "init", destroyMethod = "close")
    public DataSource pgDataSource1()
    {
        PGXADataSource primaryXaDataSource = new PGXADataSource();
        primaryXaDataSource.setUrl(dataSourceUrl);
        primaryXaDataSource.setUser(dsUsername);
        primaryXaDataSource.setPassword(dsPassword);

        AtomikosDataSourceBean xaDataSource = new AtomikosDataSourceBean();
        xaDataSource.setXaDataSource(primaryXaDataSource);
        xaDataSource.setUniqueResourceName("primaryXaDs1");
        xaDataSource.setMinPoolSize(initialDSConnections);
        xaDataSource.setMaxPoolSize(maxDSConnections);
        return xaDataSource;
    }

    @Primary
    @Bean(name = "jmsConnectionFactory", initMethod = "init", destroyMethod = "close")
    public ConnectionFactory connectionFactory()
    {
        AtomikosConnectionFactoryBean atomikosConnectionFactoryBean = new AtomikosConnectionFactoryBean();
        ActiveMQConnectionFactory activeMqXaConnectionFactory = new ActiveMQConnectionFactory(); 
        try
        {
            activeMqXaConnectionFactory.setBrokerURL(brokerUrl);
            activeMqXaConnectionFactory.setUser(brokerUsername);
            activeMqXaConnectionFactory.setPassword(brokerPassword);
            atomikosConnectionFactoryBean.setUniqueResourceName("jmsXAConnectionFactory");
            atomikosConnectionFactoryBean.setLocalTransactionMode(false);
            atomikosConnectionFactoryBean.setXaConnectionFactory(activeMqXaConnectionFactory);
        }
        catch (JMSException _e)
        {
            appLogger.info("JMS Configuration Error: " + _e);
            _e.printStackTrace();
        }
        return atomikosConnectionFactoryBean;
    }

    @PostConstruct
    public void postConstructDetails()
    {
        appLogger.info("Post Construct Start: JtaConfig.");
        appLogger.info("  - JMS: Artemis URL: {}", brokerUrl);
        appLogger.info("  - Artemis Username: {}", brokerUsername);
        appLogger.info("  - Artemis Password: {}", brokerPassword);
        appLogger.info("  - DS: PostgreSQL URL: {}", dataSourceUrl);
        appLogger.info("  - DS: PostgreSQL Username: {}", dsUsername);
        appLogger.info("  - DS: PostgreSQL Password: {}", dsPassword);
        appLogger.info("  - DS: PostgreSQL Min Conn: {}", initialDSConnections);
        appLogger.info("  - DS: PostgreSQL Max Conn: {}", maxDSConnections);
        appLogger.info("Post Construct End: JtaConfig.");
        appLogger.info(" ");
    }
} 

Here is the implementation for Services Configuration:

Services Configuration:

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Bean;
import org.springframework.jms.core.JmsTemplate;
import org.springframework.orm.jpa.LocalContainerEntityManagerFactoryBean;
import org.springframework.context.annotation.ComponentScan;
import org.springframework.context.annotation.Configuration;
import org.springframework.transaction.jta.JtaTransactionManager;
import org.springframework.transaction.PlatformTransactionManager;
import org.springframework.transaction.annotation.EnableTransactionManagement;

import javax.annotation.PostConstruct;
import javax.persistence.EntityManagerFactory;

@Configuration
@EnableTransactionManagement
@ComponentScan(basePackages = "xxx.xxx.service")
public class ServicesConfig
{
    private Logger appLogger = LoggerFactory.getLogger(ServicesConfig.class);
    
    @Autowired
    JtaConfig jtaConfig;
    
    @Bean(name = "xaJmsTemplate")
    public JmsTemplate jmsTemplate()
    {
        JmsTemplate jmsTemplate = new JmsTemplate();
        jmsTemplate.setConnectionFactory(jtaConfig.connectionFactory());
        jmsTemplate.setPubSubDomain(false);
        return jmsTemplate;
    }
    
    @Bean(name = "entityManangerFactory")
    public EntityManagerFactory entityManagerFactory()
    {
        LocalContainerEntityManagerFactoryBean factoryBean = new LocalContainerEntityManagerFactoryBean();
        
        factoryBean.setPackagesToScan("xxx.xxx.model");
        factoryBean.setDataSource(jtaConfig.pgDataSource1());
        factoryBean.setJpaProperties(jtaConfig.hibernateProperties());
        factoryBean.setPersistenceUnitName("entityManagerFactoryA");
        factoryBean.setJpaVendorAdapter(jtaConfig.jpaVendorAdapter());
        factoryBean.afterPropertiesSet();
        return factoryBean.getNativeEntityManagerFactory();
    }

    @Bean(name = "transactionManager")
    public PlatformTransactionManager transactionManager()
    {
        JtaTransactionManager ptm = new JtaTransactionManager();
        ptm.setTransactionManager(jtaConfig.atomikosTransactionManager());
        ptm.setUserTransaction(jtaConfig.userTransaction());
        return ptm;
    }
    
    @PostConstruct
    public void postConstructDetails()
    {
        appLogger.info("Post Construct Start: ServicesConfig.");
        appLogger.info("  - JMS: Artemis URL: {}", jtaConfig);
        appLogger.info("  - JMS Template: {}", jmsTemplate());
        appLogger.info("Post Construct End: ServicesConfig.");
        appLogger.info(" ");
    }
}

Here is the Service implementation:

TransactionServiceImpl

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.jms.core.JmsTemplate;
import org.springframework.stereotype.Service;
import org.springframework.transaction.annotation.Transactional;

import xxx.xxx.Users;

@Service("transactionService")
@Transactional
public class TransactionServiceImpl implements TransactionServiceIntf
{
    private static final Logger appLogger = LoggerFactory.getLogger(TransactionServiceImpl.class);

    @Autowired
    @Qualifier("xaJmsTemplate")
    JmsTemplate jmsTemplate;

    @Override
    public Users save(Users _user)
    {
        appLogger.info("TransactionServiceImpl: save: Entered.");
        Users user = _user;
        try
        {
            if(user == null)
            {
                appLogger.info("User: Null.");
            }
            else
            {
                if(jmsTemplate == null)
                {
                    appLogger.info("JMS Template: Null.");
                }
                else
                {
                    appLogger.info("JMS Template: Saving.");
                    jmsTemplate.convertAndSend("crequests", user);
                }
            }
            // The rollback should happen with the exception.
            throw new Exception();
        }
        catch(Exception _e)
        {
            appLogger.error("Catching exception: " + _e);
        }
        appLogger.info("TransactionServiceImpl: save: Exiting.");
        return user;
    }
}
6
  • You don't need to configure anything. Simply use the atomikos starter and it will work Commented Mar 15, 2021 at 15:13
  • 1
    I don't know what is atomikos, but I see that you are catching the exception, normally @Transactional will not rollback, if your method doesn't throw exception.
    – Yuvaraj G
    Commented Mar 15, 2021 at 15:33
  • @SimonMartinelli: Could you please elaborate on this a little more?
    – Sam J Sem
    Commented Mar 15, 2021 at 15:35
  • 1 Message Broker and 1 Database in a XA Transaction with Atomikos is supported out of the box without configuration. You just have to add the atomikos starter dependency in maven pom.xml Commented Mar 15, 2021 at 15:51
  • @YuvarajG: That did the trick. Thank you.
    – Sam J Sem
    Commented Mar 15, 2021 at 17:03

0

Your Answer

By clicking “Post Your Answer”, you agree to our terms of service and acknowledge you have read our privacy policy.