0

I have an application that process raw data and save it to database. It takes less than 100 ms to execute 1 thread. The application receives raw data from around 200 devices with in every 20 seconds.After a while application hangs and become not reachable. When looked at Busy Task Thread Count in wildfly, we found that Busy Task Thread Count reached the limit of 128 threads.

We couldn't figure out yet why this happens even if the processing time is very less. Can anyone help in solving this issue. Thanks in advance.

@Controller
@EnableAsync
public class DataController {

    private final Logger log = LoggerFactory.getLogger(this.getClass());

    @Autowired
    private IDataService dataService;

    @Autowired
    IDeviceService deviceService;

    @Autowired
    HttpServletResponse response;

    @Autowired
    HttpServletRequest request;

    @Autowired
    private IDeviceValidator deviceValidator;

    @Autowired
    private IJobService jobService;

    @Autowired
    private IDataAssembler dataAssembler;


    @RequestMapping(value = { "/device_dat/auth/auth_tocken/imei/{imei_no}",
            "/device_data/auth/auth_token/imei/{imei_no}" }, method = RequestMethod.POST, consumes = MediaType.APPLICATION_FORM_URLENCODED_VALUE)
    @ResponseBody
    public Object postDataText(@PathVariable(value = "imei_no") String imeiNo, @RequestBody String dataString) {
        Long startTimeMillis = System.currentTimeMillis();
        DateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss.SSS");
        log.error("PERF::" + request.getSession().getId().toString() + "::" + imeiNo + "::"
                + Thread.currentThread().getId() + "::postDataText::entry::"+ dataString+"::"+ sdf.format(startTimeMillis));
        try {
            Device device = (Device) deviceValidator.validateImeiNo(imeiNo);
            if (device != null) {
                DataVO dataVO = new DataVO();
                dataVO.setImeiNo(imeiNo);
                dataVO.setSerialNo(device.getSerialNo());
                device.setLastUpdatedTime(new Date());
                dataString = java.net.URLDecoder.decode(dataString, StandardCharsets.UTF_8.name());
                dataVO.setSensorDataList(dataString);
                List<Data> datas = dataAssembler.assembleData(dataVO, device);
                String prefix = dataString.substring(0, 11);
                deviceService.updateLastUpdatedData(datas, null, request.getSession().getId(), imeiNo, device);
                dataService.updateDeviceDataAndInsertToDB(datas, request.getSession().getId(), imeiNo, device);

            } else {
                Map<String, String> data = new HashMap<String, String>();
                data.put("message", "Device not Registered");
                ResponseVO responseVO = new ResponseVO(ResponseTypeEnum.FAILURE.toString(), 400, data);
                return responseVO;
            }
        } catch (Exception e) {
            log.error("Error occured on postDataText method:" + e.getMessage() + "\ndataString was :" + dataString);
        }
        // response.addHeader("Baeldung-Example-Header", "Value-HttpServletResponse");
        Long endTimeMillis = System.currentTimeMillis();
        log.error(
                "PERF::" + request.getSession().getId().toString() + "::" + imeiNo + "::" + Thread.currentThread().getId()
                        + "::postDataText::exit::" + sdf.format(endTimeMillis) + "::" + (endTimeMillis - startTimeMillis));
        return null;
    }
}
---------------------------------------------------------------------
@Service
@EnableAsync
public class DataServiceImpl implements IDataService {

    private final Logger log = LoggerFactory.getLogger(this.getClass());

    @Autowired
    IDeviceService deviceService;

    @Autowired
    private DataDaoImpl dataDaoImpl;

    @Autowired
    private IDataRepository dataRepository;

    @Autowired
    private IApplicationService applicationService;

    @Override
    public Device updateDeviceDataAndInsertToDB(List<Data> datas, String prefix, String imeiNo, Device device) {
        Long startTimeMillis = System.currentTimeMillis();
        DateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss.SSS");
        log.error("PERF::" + prefix + "::" + imeiNo + "::" + Thread.currentThread().getId()
                + "::updateDeviceDataAndInsertToDB::entry::" + sdf.format(startTimeMillis));

        try {
            List<Data> upDatedList = new ArrayList<Data>();
            datas.forEach(data -> {

                if (data.getSourceDate() == null) {
                    data.setSourceDate(data.getCreatedDate());
                }
                deviceService.updateLastUpdatedTime(data.getSerialNo(),device);

                data.setMetaData(device.getMetaData());
                upDatedList.add(data);
            });
            dataRepository.insert(upDatedList);

        } catch (Exception e) {
            log.error("Method:save" + e.getMessage() + ", " + datas);
        }
        Long endTimeMillis = System.currentTimeMillis();
        log.error("PERF::" + prefix + "::" + imeiNo + "::" + Thread.currentThread().getId()
                + "::updateDeviceDataAndInsertToDB::exit::" + sdf.format(endTimeMillis) + "::"
                + (endTimeMillis - startTimeMillis));
        return device;
    }
}

-------------------------------------------------------------------------------------------------------
@Service
@EnableAsync
public class DeviceServiceImpl implements IDeviceService {

    public final Logger log = LoggerFactory.getLogger(this.getClass());

    @Autowired
    IDeviceDao deviceDao;

    @Autowired
    IDeviceRepository deviceRepository;

    @Autowired
    IApplicationService applicationService;

    @Autowired
    RestTemplate restTemplate;

    @Autowired
    IDataService dataService;



    @Override
    @Async
    public void updateLastUpdatedData(List<Data> datas, Map<String, Object> metaData, String prefix, String imeiNo,
            Device device) {
        //sAsyncRestTemplate restTemplate = new AsyncRestTemplate();
        /*
         * WebClient client = WebClient.create(); Stream<Data> stringStream =
         * datas.stream(); Flux<Data> fluxFromStream = Flux.fromStream(stringStream);
         */

        Long startTimeMillis = System.currentTimeMillis();
        DateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss.SSS");
        log.error("PERF::" + prefix + "::" + imeiNo + "::" + Thread.currentThread().getId()
                + "::updateLastUpdatedData::entry::" + sdf.format(startTimeMillis));

        log.trace("Enter updateLastUpdatedData");
        try {
            Map<String, Object> response = new HashMap<String, Object>();
            String applicationName = "gtest";

            String urlString = "http://localhost:8080/" + applicationName + "/devices/";

            String deviceSerialNo = device.getSerialNo();
            String imei_no = imeiNo;
            Map<String, Object> requestBody = new HashMap<String, Object>();
            // For batch packet Device Follows LIFO
            Data data = datas.get(0);
            requestBody = (Map<String, Object>) data.getSensorDataList();
            requestBody.put("source_date", data.getSourceDate());
            if (metaData != null) {
                requestBody.put("last_known_address", metaData.get("last_known_address"));
            }
            Map<String, Object> d = (Map<String, Object>) data.getSensorDataList();
            Long count = 0L, emergencyCount = 0L;

            for (Data datax : datas) {

                Map<String, Object> x = (Map<String, Object>) datax.getSensorDataList();
                try {
                    if (x.get("packet_type").toString().contains("Alert")) {
                        count = count + 1;
                    }
                    if (x.get("packet_type").toString().contains("Emergency button wire disconnect/wire-cut")) {
                        emergencyCount = emergencyCount + 1;
                    }
                } catch (Exception e) {
                    // log.error("method:updateLastUpdatedData,"+e.getMessage()+"d"+d+"
                    // ,data:"+data);
                }

            }
            requestBody.put("alert_count", count);
            requestBody.put("emergency_alert_count", emergencyCount);
            if (deviceSerialNo != null) {

                urlString += deviceSerialNo;

                restTemplate.postForObject(urlString, requestBody, Map.class);

            }


        } catch (Exception e) {
             log.error("method:updateLastUpdatedData,"+e.getMessage()+", datas"+datas);
        }
        log.trace("Exit updateLastUpdatedData");
        Long endTimeMillis = System.currentTimeMillis();
        log.error("PERF::" + prefix + "::" + imeiNo + "::" + Thread.currentThread().getId()
                + "::updateLastUpdatedData::exit::" + sdf.format(endTimeMillis) + "::" + (endTimeMillis - startTimeMillis));

    }
    }
    }
    --------------------------------------------------------------------------------------------

@Repository
public class DataRepository implements IDataRepository {

    public final Logger log = LoggerFactory.getLogger(this.getClass()); 
    @Override
    public void insert(List<Data> upDatedList) {
        log.trace("Enter insert");
        Query query = new Query();
        query.maxTimeMsec(10);
        MongoOperations mongoOperations = mongoTemplate;
        mongoTemplate.insert(upDatedList, Data.class);
        log.trace("Exit insert");

    }
}
2
  • You need to add your code. And you can also take thread dump to check what those busy threads are doing.
    – Ivan
    Commented May 16, 2020 at 14:10
  • Hi Ivan, Please look at the code I shared Commented May 16, 2020 at 16:33

0

Your Answer

By clicking “Post Your Answer”, you agree to our terms of service and acknowledge you have read our privacy policy.