bulk_writer: fixes for JSON generation and added exception for use of 'tokens' varname

class_update/elastic_update: Moved response checking to elastic_update
dupe_detect: Finalized dupe_detect
master
Erik de Vries 6 years ago
parent 755a58d84d
commit 0e8c127b86

@ -4,9 +4,10 @@
#' Type can be either one of three values:
#' set: set the value of [varname] to x
#' add: add x to the values of [varname]
#' varname: When using tokens, the token field will be updated instead of a computerCodes field
#' @param x A single-row data frame, or a string containing the variables and/or values that should be updated (a data frame is converted to a JSON object, strings are stored as-is)
#' @param index The name of the Elasticsearch index to update
#' @param varname String indicating the parent variable that should be updated (when it does not exist, it will be created, all varnames arexed by computerCodes)
#' @param varname String indicating the parent variable that should be updated (when it does not exist, it will be created, all varnames are prefixed by computerCodes)
#' @param type Type of updating to be done, can be either 'set', 'add', or 'addnested'
#' @return A string usable as Elasticsearch bulk update command, in line-delimited JSON
#' @export
@ -18,11 +19,17 @@
bulk_writer <- function(x, index = 'maml', varname = 'updated_variable', type) {
### Create a json object if more than one variable besides _id, otherwise use value as-is
if (length(x) > 2) {
json <- toJSON(bind_rows(x[-1]), collapse = T)
} else {
json <- toJSON(bind_rows(x)[-1], collapse = T)
x} else {
names(x) <- NULL
json <- toJSON(x[-1], collapse = T)
}
if (varname == "tokens") {
return(
paste0('{"update": {"_index": "',index,'", "_type": "doc", "_id": "',x[1],'"}}
{ "script" : { "source": "ctx._source.tokens = params.code", "lang" : "painless", "params": { "code": ',json,'}}}')
)
}
if (type == 'set') {
return(
paste0('{"update": {"_index": "',index,'", "_type": "doc", "_id": "',x[1],'"}}

@ -10,22 +10,15 @@
#' @return As this is a nested function used within elasticizer, there is no return output
#' @export
#' @examples
#' class_update(out, localhost = T, model_final, dfm_words, varname, es_super)
#' class_update(out, localhost = T, model_final, dfm_words, varname, es_super = .rs.askForPassword('ElasticSearch WRITE'))
#################################################################################################
#################################### Update any kind of classification ##########################
#################################################################################################
class_update <- function(out, localhost = T, model_final, dfm_words, varname, es_super) {
class_update <- function(out, localhost = T, model_final, dfm_words, varname, es_super = .rs.askForPassword('ElasticSearch WRITE')) {
print('updating')
dfm <- dfm_gen(out, text = 'lemmas') %>%
dfm_keep(dfm_words, valuetype="fixed", verbose=T)
pred <- data.frame(id = out$`_id`, pred = predict(model_final, newdata = dfm))
bulk <- apply(pred, 1, bulk_writer, varname = varname, type = 'set')
res <- elastic_update(bulk, es_super = es_super, localhost = localhost)
httr:::stop_for_status(res)
appData <- httr:::content(res)
if (appData$errors == T){
print(appData)
stop("Aborting, errors found during updating")
}
print("updated")
}

@ -6,17 +6,18 @@
#' @param cutoff_lower Cutoff value for minimum cosine similarity above which documents are considered duplicates (inclusive)
#' @param cutoff_upper Cutoff value for maximum cosine similarity, above which documents are not considered duplicates (for debugging and manual parameter tuning, inclusive)
#' @param es_pwd Password for Elasticsearch read access
#' @param es_super Password for write access to ElasticSearch
#' @param words Document cutoff point in number of words. Documents are cut off at the last [.?!] before the cutoff (so document will be a little shorter than [words])
#' @param localhost Defaults to true. When true, connect to a local Elasticsearch instance on the default port (9200)
#' @return dupe_objects.json and data frame containing each id and all its duplicates. remove_ids.txt and character vector with list of ids to be removed. Files are in current working directory
#' @export
#' @examples
#' dupe_detect(1,grid,cutoff_lower, cutoff_upper = 1, es_pwd, words)
#' dupe_detect(1,grid,cutoff_lower, cutoff_upper = 1, es_pwd, es_super, words, localhost = T)
#################################################################################################
#################################### Duplicate detector ################################
#################################################################################################
dupe_detect <- function(row, grid, cutoff_lower, cutoff_upper = 1, es_pwd, words) {
dupe_detect <- function(row, grid, cutoff_lower, cutoff_upper = 1, es_pwd, es_super, words, localhost = T) {
params <- grid[row,]
print(paste0('Parsing ',params$doctypes,' on ',params$dates ))
query <- paste0('{"query":
@ -29,10 +30,8 @@ dupe_detect <- function(row, grid, cutoff_lower, cutoff_upper = 1, es_pwd, words
}}]
} } }')
out <- elasticizer(query, es_pwd = es_pwd)
if (out$hits$total > 0) {
out <- elasticizer(query, es_pwd = es_pwd, localhost=T)
if (class(out$hits$hits) != 'list') {
dfm <- dfm_gen(out, text = "full", words = words)
simil <- as.matrix(textstat_simil(dfm, margin="documents", method="cosine"))
diag(simil) <- NA
@ -42,14 +41,21 @@ dupe_detect <- function(row, grid, cutoff_lower, cutoff_upper = 1, es_pwd, words
.[,c(1,4)] %>%
group_by(colid) %>% summarise(rowid=list(rowid))
text <- capture.output(stream_out(df))
write(text[-length(text)], file = paste0(getwd(),'/dupe_objects.json'), append=T)
# write(text[-length(text)], file = paste0(getwd(),'/dupe_objects.json'), append=T)
simil[upper.tri(simil)] <- NA
write(unique(rownames(which(simil >= cutoff_lower & simil <= cutoff_upper, arr.ind = TRUE))),
file = paste0(getwd(),'/remove_ids.txt'),
append=T)
return(list(df,unique(rownames(which(simil >= cutoff_lower & simil <= cutoff_upper, arr.ind = TRUE)))))
# write(unique(rownames(which(simil >= cutoff_lower & simil <= cutoff_upper, arr.ind = TRUE))),
# file = paste0(getwd(),'/remove_ids.txt'),
# append=T)
dupe_delete <- data.frame(id=unique(rownames(which(simil >= cutoff_lower & simil <= cutoff_upper, arr.ind = TRUE))),
dupe_delete = rep(1,length(unique(rownames(which(simil >= cutoff_lower & simil <= cutoff_upper, arr.ind = TRUE))))))
bulk <- c(apply(df, 1, bulk_writer, varname='duplicates', type = 'set'),
apply(dupe_delete, 1, bulk_writer, varname='_delete', type = 'set'))
if (length(bulk) > 0) {
res <- elastic_update(bulk, es_super = es_super, localhost = localhost)
}
return(paste0('Checked ',params$doctypes,' on ',params$dates ))
} else {
return(NULL)
return(paste0('No results for ',params$doctypes,' on ',params$dates ))
}
### Dummy code to verify that filtering out unique ids using the bottom half of the matrix actually works
# id_filter <- unique(rownames(which(simil >= cutoff, arr.ind = TRUE)))

@ -27,5 +27,12 @@ elastic_update <- function(x, es_super = 'secret', localhost = T) {
, times = 10
, pause_min = 10
)
return(res)
httr:::stop_for_status(res)
appData <- httr:::content(res)
if (appData$errors == T){
print(appData)
stop("Aborting, errors found during updating")
}
print("updated")
return(1)
}

@ -11,7 +11,7 @@ bulk_writer(x, index = "maml", varname = "updated_variable", type)
\item{index}{The name of the Elasticsearch index to update}
\item{varname}{String indicating the parent variable that should be updated (when it does not exist, it will be created, all varnames arexed by computerCodes)}
\item{varname}{String indicating the parent variable that should be updated (when it does not exist, it will be created, all varnames are prefixed by computerCodes)}
\item{type}{Type of updating to be done, can be either 'set', 'add', or 'addnested'}
}
@ -23,6 +23,7 @@ Generate a line-delimited JSON string for use in Elasticsearch bulk updates
Type can be either one of three values:
set: set the value of [varname] to x
add: add x to the values of [varname]
varname: When using tokens, the token field will be updated instead of a computerCodes field
}
\examples{
bulk_writer(x, index = 'maml', varname = 'updated_variable')

@ -5,7 +5,7 @@
\title{Classifier function for use in combination with the elasticizer function as 'update' parameter (without brackets), see elasticizer documentation for more information}
\usage{
class_update(out, localhost = T, model_final, dfm_words, varname,
es_super)
es_super = .rs.askForPassword("ElasticSearch WRITE"))
}
\arguments{
\item{out}{Does not need to be defined explicitly! (is already parsed in the elasticizer function)}
@ -27,5 +27,5 @@ As this is a nested function used within elasticizer, there is no return output
Classifier function for use in combination with the elasticizer function as 'update' parameter (without brackets), see elasticizer documentation for more information
}
\examples{
class_update(out, localhost = T, model_final, dfm_words, varname, es_super)
class_update(out, localhost = T, model_final, dfm_words, varname, es_super = .rs.askForPassword('ElasticSearch WRITE'))
}

Loading…
Cancel
Save