I have an application that process raw data and save it to database. It takes less than 100 ms to execute 1 thread. The application receives raw data from around 200 devices with in every 20 seconds.After a while application hangs and become not reachable. When looked at Busy Task Thread Count in wildfly, we found that Busy Task Thread Count reached the limit of 128 threads.
We couldn't figure out yet why this happens even if the processing time is very less. Can anyone help in solving this issue. Thanks in advance.
@Controller
@EnableAsync
public class DataController {
private final Logger log = LoggerFactory.getLogger(this.getClass());
@Autowired
private IDataService dataService;
@Autowired
IDeviceService deviceService;
@Autowired
HttpServletResponse response;
@Autowired
HttpServletRequest request;
@Autowired
private IDeviceValidator deviceValidator;
@Autowired
private IJobService jobService;
@Autowired
private IDataAssembler dataAssembler;
@RequestMapping(value = { "/device_dat/auth/auth_tocken/imei/{imei_no}",
"/device_data/auth/auth_token/imei/{imei_no}" }, method = RequestMethod.POST, consumes = MediaType.APPLICATION_FORM_URLENCODED_VALUE)
@ResponseBody
public Object postDataText(@PathVariable(value = "imei_no") String imeiNo, @RequestBody String dataString) {
Long startTimeMillis = System.currentTimeMillis();
DateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss.SSS");
log.error("PERF::" + request.getSession().getId().toString() + "::" + imeiNo + "::"
+ Thread.currentThread().getId() + "::postDataText::entry::"+ dataString+"::"+ sdf.format(startTimeMillis));
try {
Device device = (Device) deviceValidator.validateImeiNo(imeiNo);
if (device != null) {
DataVO dataVO = new DataVO();
dataVO.setImeiNo(imeiNo);
dataVO.setSerialNo(device.getSerialNo());
device.setLastUpdatedTime(new Date());
dataString = java.net.URLDecoder.decode(dataString, StandardCharsets.UTF_8.name());
dataVO.setSensorDataList(dataString);
List<Data> datas = dataAssembler.assembleData(dataVO, device);
String prefix = dataString.substring(0, 11);
deviceService.updateLastUpdatedData(datas, null, request.getSession().getId(), imeiNo, device);
dataService.updateDeviceDataAndInsertToDB(datas, request.getSession().getId(), imeiNo, device);
} else {
Map<String, String> data = new HashMap<String, String>();
data.put("message", "Device not Registered");
ResponseVO responseVO = new ResponseVO(ResponseTypeEnum.FAILURE.toString(), 400, data);
return responseVO;
}
} catch (Exception e) {
log.error("Error occured on postDataText method:" + e.getMessage() + "\ndataString was :" + dataString);
}
// response.addHeader("Baeldung-Example-Header", "Value-HttpServletResponse");
Long endTimeMillis = System.currentTimeMillis();
log.error(
"PERF::" + request.getSession().getId().toString() + "::" + imeiNo + "::" + Thread.currentThread().getId()
+ "::postDataText::exit::" + sdf.format(endTimeMillis) + "::" + (endTimeMillis - startTimeMillis));
return null;
}
}
---------------------------------------------------------------------
@Service
@EnableAsync
public class DataServiceImpl implements IDataService {
private final Logger log = LoggerFactory.getLogger(this.getClass());
@Autowired
IDeviceService deviceService;
@Autowired
private DataDaoImpl dataDaoImpl;
@Autowired
private IDataRepository dataRepository;
@Autowired
private IApplicationService applicationService;
@Override
public Device updateDeviceDataAndInsertToDB(List<Data> datas, String prefix, String imeiNo, Device device) {
Long startTimeMillis = System.currentTimeMillis();
DateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss.SSS");
log.error("PERF::" + prefix + "::" + imeiNo + "::" + Thread.currentThread().getId()
+ "::updateDeviceDataAndInsertToDB::entry::" + sdf.format(startTimeMillis));
try {
List<Data> upDatedList = new ArrayList<Data>();
datas.forEach(data -> {
if (data.getSourceDate() == null) {
data.setSourceDate(data.getCreatedDate());
}
deviceService.updateLastUpdatedTime(data.getSerialNo(),device);
data.setMetaData(device.getMetaData());
upDatedList.add(data);
});
dataRepository.insert(upDatedList);
} catch (Exception e) {
log.error("Method:save" + e.getMessage() + ", " + datas);
}
Long endTimeMillis = System.currentTimeMillis();
log.error("PERF::" + prefix + "::" + imeiNo + "::" + Thread.currentThread().getId()
+ "::updateDeviceDataAndInsertToDB::exit::" + sdf.format(endTimeMillis) + "::"
+ (endTimeMillis - startTimeMillis));
return device;
}
}
-------------------------------------------------------------------------------------------------------
@Service
@EnableAsync
public class DeviceServiceImpl implements IDeviceService {
public final Logger log = LoggerFactory.getLogger(this.getClass());
@Autowired
IDeviceDao deviceDao;
@Autowired
IDeviceRepository deviceRepository;
@Autowired
IApplicationService applicationService;
@Autowired
RestTemplate restTemplate;
@Autowired
IDataService dataService;
@Override
@Async
public void updateLastUpdatedData(List<Data> datas, Map<String, Object> metaData, String prefix, String imeiNo,
Device device) {
//sAsyncRestTemplate restTemplate = new AsyncRestTemplate();
/*
* WebClient client = WebClient.create(); Stream<Data> stringStream =
* datas.stream(); Flux<Data> fluxFromStream = Flux.fromStream(stringStream);
*/
Long startTimeMillis = System.currentTimeMillis();
DateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss.SSS");
log.error("PERF::" + prefix + "::" + imeiNo + "::" + Thread.currentThread().getId()
+ "::updateLastUpdatedData::entry::" + sdf.format(startTimeMillis));
log.trace("Enter updateLastUpdatedData");
try {
Map<String, Object> response = new HashMap<String, Object>();
String applicationName = "gtest";
String urlString = "http://localhost:8080/" + applicationName + "/devices/";
String deviceSerialNo = device.getSerialNo();
String imei_no = imeiNo;
Map<String, Object> requestBody = new HashMap<String, Object>();
// For batch packet Device Follows LIFO
Data data = datas.get(0);
requestBody = (Map<String, Object>) data.getSensorDataList();
requestBody.put("source_date", data.getSourceDate());
if (metaData != null) {
requestBody.put("last_known_address", metaData.get("last_known_address"));
}
Map<String, Object> d = (Map<String, Object>) data.getSensorDataList();
Long count = 0L, emergencyCount = 0L;
for (Data datax : datas) {
Map<String, Object> x = (Map<String, Object>) datax.getSensorDataList();
try {
if (x.get("packet_type").toString().contains("Alert")) {
count = count + 1;
}
if (x.get("packet_type").toString().contains("Emergency button wire disconnect/wire-cut")) {
emergencyCount = emergencyCount + 1;
}
} catch (Exception e) {
// log.error("method:updateLastUpdatedData,"+e.getMessage()+"d"+d+"
// ,data:"+data);
}
}
requestBody.put("alert_count", count);
requestBody.put("emergency_alert_count", emergencyCount);
if (deviceSerialNo != null) {
urlString += deviceSerialNo;
restTemplate.postForObject(urlString, requestBody, Map.class);
}
} catch (Exception e) {
log.error("method:updateLastUpdatedData,"+e.getMessage()+", datas"+datas);
}
log.trace("Exit updateLastUpdatedData");
Long endTimeMillis = System.currentTimeMillis();
log.error("PERF::" + prefix + "::" + imeiNo + "::" + Thread.currentThread().getId()
+ "::updateLastUpdatedData::exit::" + sdf.format(endTimeMillis) + "::" + (endTimeMillis - startTimeMillis));
}
}
}
--------------------------------------------------------------------------------------------
@Repository
public class DataRepository implements IDataRepository {
public final Logger log = LoggerFactory.getLogger(this.getClass());
@Override
public void insert(List<Data> upDatedList) {
log.trace("Enter insert");
Query query = new Query();
query.maxTimeMsec(10);
MongoOperations mongoOperations = mongoTemplate;
mongoTemplate.insert(upDatedList, Data.class);
log.trace("Exit insert");
}
}