-
Notifications
You must be signed in to change notification settings - Fork 168
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Cohort demographics results are available for visualization if explicitly requested while executing #2402
base: master
Are you sure you want to change the base?
Cohort demographics results are available for visualization if explicitly requested while executing #2402
Changes from 6 commits
3ad6e17
645ace8
a27e6f0
614ff5b
65eab63
9f80bd6
c46e545
1b39103
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -16,21 +16,38 @@ | |
package org.ohdsi.webapi.cohortdefinition; | ||
|
||
import org.ohdsi.circe.helper.ResourceHelper; | ||
import org.ohdsi.cohortcharacterization.CCQueryBuilder; | ||
import org.ohdsi.sql.BigQuerySparkTranslate; | ||
import org.ohdsi.sql.SqlRender; | ||
import org.ohdsi.sql.SqlSplit; | ||
import org.ohdsi.sql.SqlTranslate; | ||
import org.ohdsi.webapi.cohortcharacterization.domain.CohortCharacterizationEntity; | ||
import org.ohdsi.webapi.common.generation.CancelableTasklet; | ||
import org.ohdsi.webapi.common.generation.GenerationUtils; | ||
import org.ohdsi.webapi.feanalysis.domain.FeAnalysisEntity; | ||
import org.ohdsi.webapi.feanalysis.repository.FeAnalysisEntityRepository; | ||
import org.ohdsi.webapi.generationcache.GenerationCacheHelper; | ||
import org.ohdsi.webapi.shiro.Entities.UserRepository; | ||
import org.ohdsi.webapi.source.Source; | ||
import org.ohdsi.webapi.source.SourceService; | ||
import org.ohdsi.webapi.util.CancelableJdbcTemplate; | ||
import org.ohdsi.webapi.util.SessionUtils; | ||
import org.ohdsi.webapi.util.SourceUtils; | ||
import org.slf4j.LoggerFactory; | ||
import org.springframework.batch.core.scope.context.ChunkContext; | ||
import org.springframework.batch.core.step.tasklet.StoppableTasklet; | ||
import org.springframework.beans.factory.annotation.Autowired; | ||
import org.springframework.transaction.support.TransactionTemplate; | ||
|
||
import com.google.common.collect.ImmutableList; | ||
import com.odysseusinc.arachne.commons.types.DBMSType; | ||
|
||
import java.sql.SQLException; | ||
import java.util.Arrays; | ||
import java.util.HashSet; | ||
import java.util.Map; | ||
import java.util.Set; | ||
import java.util.stream.Collectors; | ||
|
||
import static org.ohdsi.webapi.Constants.Params.*; | ||
|
||
|
@@ -44,54 +61,148 @@ public class GenerateCohortTasklet extends CancelableTasklet implements Stoppabl | |
private final GenerationCacheHelper generationCacheHelper; | ||
private final CohortDefinitionRepository cohortDefinitionRepository; | ||
private final SourceService sourceService; | ||
private final FeAnalysisEntityRepository feAnalysisRepository; | ||
|
||
public GenerateCohortTasklet(final CancelableJdbcTemplate jdbcTemplate, final TransactionTemplate transactionTemplate, | ||
final GenerationCacheHelper generationCacheHelper, | ||
final CohortDefinitionRepository cohortDefinitionRepository, final SourceService sourceService) { | ||
super(LoggerFactory.getLogger(GenerateCohortTasklet.class), jdbcTemplate, transactionTemplate); | ||
this.generationCacheHelper = generationCacheHelper; | ||
this.cohortDefinitionRepository = cohortDefinitionRepository; | ||
this.sourceService = sourceService; | ||
this.feAnalysisRepository = null; | ||
} | ||
|
||
public GenerateCohortTasklet( | ||
final CancelableJdbcTemplate jdbcTemplate, | ||
final TransactionTemplate transactionTemplate, | ||
final GenerationCacheHelper generationCacheHelper, | ||
final CohortDefinitionRepository cohortDefinitionRepository, | ||
final SourceService sourceService | ||
final SourceService sourceService, final FeAnalysisEntityRepository feAnalysisRepository | ||
) { | ||
super(LoggerFactory.getLogger(GenerateCohortTasklet.class), jdbcTemplate, transactionTemplate); | ||
this.generationCacheHelper = generationCacheHelper; | ||
this.cohortDefinitionRepository = cohortDefinitionRepository; | ||
this.sourceService = sourceService; | ||
this.feAnalysisRepository = feAnalysisRepository; | ||
} | ||
|
||
@Override | ||
protected String[] prepareQueries(ChunkContext chunkContext, CancelableJdbcTemplate jdbcTemplate) { | ||
Map<String, Object> jobParams = chunkContext.getStepContext().getJobParameters(); | ||
|
||
Boolean demographicStat = jobParams.get(DEMOGRAPHIC_STATS) == null ? null | ||
: Boolean.valueOf((String) jobParams.get(DEMOGRAPHIC_STATS)); | ||
|
||
if (demographicStat != null && demographicStat.booleanValue()) { | ||
return prepareQueriesDemographic(chunkContext, jdbcTemplate); | ||
} | ||
|
||
return prepareQueriesDefault(jobParams, jdbcTemplate); | ||
} | ||
|
||
private String[] prepareQueriesDemographic(ChunkContext chunkContext, CancelableJdbcTemplate jdbcTemplate) { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This logic has been taken from Cohort Characterization |
||
Map<String, Object> jobParams = chunkContext.getStepContext().getJobParameters(); | ||
CohortCharacterizationEntity cohortCharacterization = new CohortCharacterizationEntity(); | ||
|
||
Integer cohortDefinitionId = Integer.valueOf(jobParams.get(COHORT_DEFINITION_ID).toString()); | ||
CohortDefinition cohortDefinition = cohortDefinitionRepository.findOneWithDetail(cohortDefinitionId); | ||
|
||
cohortCharacterization.setCohortDefinitions(new HashSet<>(Arrays.asList(cohortDefinition))); | ||
|
||
// Get FE Analysis Demographic (Gender, Age, Race,) | ||
Set<FeAnalysisEntity> feAnalysis = feAnalysisRepository.findByListIds(Arrays.asList(70, 72, 74, 77)); | ||
|
||
// Set<CcFeAnalysisEntity> ccFeAnalysis = feAnalysis.stream().map(a -> { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. It relates to https://github.com/OHDSI/WebAPI/pull/2388/files when it is merged to 'master' |
||
// CcFeAnalysisEntity ccA = new CcFeAnalysisEntity(); | ||
// ccA.setCohortCharacterization(cohortCharacterization); | ||
// ccA.setFeatureAnalysis(a); | ||
// return ccA; | ||
// }).collect(Collectors.toSet()); | ||
|
||
cohortCharacterization.setFeatureAnalyses(feAnalysis); | ||
|
||
final Long jobId = chunkContext.getStepContext().getStepExecution().getJobExecution().getId(); | ||
|
||
final Integer sourceId = Integer.valueOf(jobParams.get(SOURCE_ID).toString()); | ||
final Source source = sourceService.findBySourceId(sourceId); | ||
|
||
final String cohortTable = jobParams.get(TARGET_TABLE).toString(); | ||
final String sessionId = jobParams.get(SESSION_ID).toString(); | ||
|
||
final String tempSchema = SourceUtils.getTempQualifier(source); | ||
|
||
boolean includeAnnual = false; | ||
boolean includeTemporal = false; | ||
|
||
CCQueryBuilder ccQueryBuilder = new CCQueryBuilder(cohortCharacterization, cohortTable, sessionId, | ||
SourceUtils.getCdmQualifier(source), SourceUtils.getResultsQualifier(source), | ||
SourceUtils.getVocabularyQualifier(source), tempSchema, jobId); | ||
String sql = ccQueryBuilder.build(); | ||
|
||
/* | ||
* There is an issue with temp tables on sql server: Temp tables scope is | ||
* session or stored procedure. To execute PreparedStatement sql server | ||
* uses stored procedure <i>sp_executesql</i> and this is the reason why | ||
* multiple PreparedStatements cannot share the same local temporary | ||
* table. | ||
* | ||
* On the other side, temp tables cannot be re-used in the same | ||
* PreparedStatement, e.g. temp table cannot be created, used, dropped and | ||
* created again in the same PreparedStatement because sql optimizator | ||
* detects object already exists and fails. When is required to re-use | ||
* temp table it should be separated to several PreparedStatements. | ||
* | ||
* An option to use global temp tables also doesn't work since such tables | ||
* can be not supported / disabled. | ||
* | ||
* Therefore, there are two ways: - either precisely group SQLs into | ||
* statements so that temp tables aren't re-used in a single statement, - | ||
* or use ‘permanent temporary tables’ | ||
* | ||
* The second option looks better since such SQL could be exported and | ||
* executed manually, which is not the case with the first option. | ||
*/ | ||
if (ImmutableList.of(DBMSType.MS_SQL_SERVER.getOhdsiDB(), DBMSType.PDW.getOhdsiDB()) | ||
.contains(source.getSourceDialect())) { | ||
sql = sql.replaceAll("#", tempSchema + "." + sessionId + "_").replaceAll("tempdb\\.\\.", ""); | ||
} | ||
if (source.getSourceDialect().equals("spark")) { | ||
try { | ||
sql = BigQuerySparkTranslate.sparkHandleInsert(sql, source.getSourceConnection()); | ||
} catch (SQLException e) { | ||
e.printStackTrace(); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. A log entry should be created instead both in the source code from where it was copied and here |
||
} | ||
} | ||
|
||
final String translatedSql = SqlTranslate.translateSql(sql, source.getSourceDialect(), sessionId, tempSchema); | ||
return SqlSplit.splitSql(translatedSql); | ||
} | ||
|
||
private String[] prepareQueriesDefault(Map<String, Object> jobParams, CancelableJdbcTemplate jdbcTemplate) { | ||
Integer cohortDefinitionId = Integer.valueOf(jobParams.get(COHORT_DEFINITION_ID).toString()); | ||
Integer sourceId = Integer.parseInt(jobParams.get(SOURCE_ID).toString()); | ||
String targetSchema = jobParams.get(TARGET_DATABASE_SCHEMA).toString(); | ||
String sessionId = jobParams.getOrDefault(SESSION_ID, SessionUtils.sessionId()).toString(); | ||
|
||
CohortDefinition cohortDefinition = cohortDefinitionRepository.findOneWithDetail(cohortDefinitionId); | ||
Source source = sourceService.findBySourceId(sourceId); | ||
|
||
CohortGenerationRequestBuilder generationRequestBuilder = new CohortGenerationRequestBuilder(sessionId, | ||
targetSchema); | ||
|
||
int designHash = this.generationCacheHelper.computeHash(cohortDefinition.getDetails().getExpression()); | ||
CohortGenerationUtils.insertInclusionRules(cohortDefinition, source, designHash, targetSchema, sessionId, | ||
jdbcTemplate); | ||
|
||
GenerationCacheHelper.CacheResult res = generationCacheHelper.computeCacheIfAbsent(cohortDefinition, source, | ||
generationRequestBuilder, | ||
(resId, sqls) -> generationCacheHelper.runCancelableCohortGeneration(jdbcTemplate, stmtCancel, sqls)); | ||
|
||
Map<String, Object> jobParams = chunkContext.getStepContext().getJobParameters(); | ||
|
||
Integer cohortDefinitionId = Integer.valueOf(jobParams.get(COHORT_DEFINITION_ID).toString()); | ||
Integer sourceId = Integer.parseInt(jobParams.get(SOURCE_ID).toString()); | ||
String targetSchema = jobParams.get(TARGET_DATABASE_SCHEMA).toString(); | ||
String sessionId = jobParams.getOrDefault(SESSION_ID, SessionUtils.sessionId()).toString(); | ||
|
||
CohortDefinition cohortDefinition = cohortDefinitionRepository.findOneWithDetail(cohortDefinitionId); | ||
Source source = sourceService.findBySourceId(sourceId); | ||
|
||
CohortGenerationRequestBuilder generationRequestBuilder = new CohortGenerationRequestBuilder( | ||
sessionId, | ||
targetSchema | ||
); | ||
|
||
int designHash = this.generationCacheHelper.computeHash(cohortDefinition.getDetails().getExpression()); | ||
CohortGenerationUtils.insertInclusionRules(cohortDefinition, source, designHash, targetSchema, sessionId, jdbcTemplate); | ||
|
||
GenerationCacheHelper.CacheResult res = generationCacheHelper.computeCacheIfAbsent( | ||
cohortDefinition, | ||
source, | ||
generationRequestBuilder, | ||
(resId, sqls) -> generationCacheHelper.runCancelableCohortGeneration(jdbcTemplate, stmtCancel, sqls) | ||
); | ||
|
||
String sql = SqlRender.renderSql( | ||
copyGenerationIntoCohortTableSql, | ||
new String[]{ RESULTS_DATABASE_SCHEMA, COHORT_DEFINITION_ID, DESIGN_HASH }, | ||
new String[]{ targetSchema, cohortDefinition.getId().toString(), res.getIdentifier().toString() } | ||
); | ||
sql = SqlTranslate.translateSql(sql, source.getSourceDialect()); | ||
return SqlSplit.splitSql(sql); | ||
String sql = SqlRender.renderSql(copyGenerationIntoCohortTableSql, | ||
new String[] { RESULTS_DATABASE_SCHEMA, COHORT_DEFINITION_ID, DESIGN_HASH }, | ||
new String[] { targetSchema, cohortDefinition.getId().toString(), res.getIdentifier().toString() }); | ||
sql = SqlTranslate.translateSql(sql, source.getSourceDialect()); | ||
return SqlSplit.splitSql(sql); | ||
} | ||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -17,6 +17,8 @@ | |
|
||
import java.util.List; | ||
|
||
import org.ohdsi.webapi.cohortcharacterization.report.Report; | ||
|
||
/** | ||
* | ||
* @author Chris Knoll <[email protected]> | ||
|
@@ -42,5 +44,10 @@ public static class InclusionRuleStatistic | |
public Summary summary; | ||
public List<InclusionRuleStatistic> inclusionRuleStats; | ||
public String treemapData; | ||
public List<Report> demographicsStats; | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. There should be found a way not to extend the existing InclusionRuleReport class as they don't relate to each other |
||
|
||
public Float prevalenceThreshold = 0.01f; | ||
public Boolean showEmptyResults = false; | ||
public int count = 0; | ||
|
||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This identifier is necessary to correlate a Cohort execution with associated demographics Cohort Characterization