From f543d658bd01464cb965ec995449d238f1304c70 Mon Sep 17 00:00:00 2001 From: Erik de Vries Date: Tue, 13 Nov 2018 15:03:33 +0100 Subject: [PATCH] Major overhaul to ES bulk update integration. Added support for both setting and appending to variables --- R/bulk_writer.R | 33 ++++++++++++++++++++++++++------- R/class_update.R | 11 +++++------ R/elastic_update.R | 13 +++++-------- R/elasticizer.R | 13 +++++++------ man/bulk_writer.Rd | 9 +++++++-- man/class_update.Rd | 7 +++++-- man/elastic_update.Rd | 4 ++-- man/elasticizer.Rd | 8 ++++---- 8 files changed, 61 insertions(+), 37 deletions(-) diff --git a/R/bulk_writer.R b/R/bulk_writer.R index 91e90d9..e0f0400 100644 --- a/R/bulk_writer.R +++ b/R/bulk_writer.R @@ -1,9 +1,13 @@ #' Generate a line-delimited JSON string for use in Elasticsearch bulk updates #' #' Generate a line-delimited JSON string for use in Elasticsearch bulk updates +#' Type can be either one of three values: +#' set: set the value of [varname] to x +#' add: add x to the values of [varname] #' @param x A single-row data frame, or a string containing the variables and/or values that should be updated (a data frame is converted to a JSON object, strings are stored as-is) #' @param index The name of the Elasticsearch index to update -#' @param varname String indicating the parent variable that should be updated (when it does not exist, it will be created) +#' @param varname String indicating the parent variable that should be updated (when it does not exist, it will be created, all varnames arexed by computerCodes) +#' @param type Type of updating to be done, can be either 'set', 'add', or 'addnested' #' @return A string usable as Elasticsearch bulk update command, in line-delimited JSON #' @export #' @examples @@ -11,9 +15,24 @@ ################################################################################################# #################################### Bulk update writer ################################ ################################################################################################# -bulk_writer <- function(x, index = 'maml', varname = 'updated_variable') { - return( - paste0('{"update": {"_index": "',index,'", "_type": "doc", "_id": "',x[1],'"}} - { "script" : { "source": "ctx._source.',varname,' = params.code", "lang" : "painless","params" : {"code":',toJSON(x[-1], collapse = F),'}}}') - ) -} \ No newline at end of file +bulk_writer <- function(x, index = 'maml', varname = 'updated_variable', type) { + ### Create a json object if more than one variable besides _id, otherwise use value as-is + if (length(x) > 2) { + json <- toJSON(bind_rows(x[-1]), collapse = T) + } else { + json <- toJSON(x[-1], collapse = T) + } + if (type == 'set') { + return( + paste0('{"update": {"_index": "',index,'", "_type": "doc", "_id": "',x[1],'"}} +{ "script" : { "source": "if (ctx._source.computerCodes != null) {ctx._source.computerCodes.',varname,' = params.code} else {ctx._source.computerCodes = params.object}", "lang" : "painless", "params": { "code": ',json,', "object": {"',varname,'": ',json,'} }}}') + ) + } + if (type == "add") { + return( + paste0('{"update": {"_index": "',index,'", "_type": "doc", "_id": "',x[1],'"}} + {"script": {"source": "if (ctx._source.computerCodes != null && ctx._source.computerCodes.containsKey(\\"',varname,'\\")) {ctx._source.computerCodes.',varname,'.addAll(params.code)} else if (ctx._source.computerCodes != null) {ctx._source.computerCodes.',varname,' = params.code} else {ctx._source.computerCodes = params.object}", "lang" : "painless", "params": { "code": ',json,' , "object": {"',varname,'": ',json,'}}}}' + ) + ) + } +} diff --git a/R/class_update.R b/R/class_update.R index 845526c..187036c 100644 --- a/R/class_update.R +++ b/R/class_update.R @@ -2,6 +2,7 @@ #' #' Classifier function for use in combination with the elasticizer function as 'update' parameter (without brackets), see elasticizer documentation for more information #' @param out Does not need to be defined explicitly! (is already parsed in the elasticizer function) +#' @param localhost Defaults to false. When true, connect to a local Elasticsearch instance on the default port (9200) #' @param model_final The classification model (e.g. output from textstat_nb(), svm() or others) #' @param dfm_words A dfm containing all the words and only the words used to generate the model (is used for subsetting) #' @param varname String containing the variable name to use for the classification result, usually has the format computerCodes.varname @@ -9,24 +10,22 @@ #' @return As this is a nested function used within elasticizer, there is no return output #' @export #' @examples -#' elasticizer(query, src = T, es_pwd = es_pwd, update = class_update, model_final = model_final, dfm_words = dfm_words, varname = computerCodes.varname, es_super = es_super) +#' class_update(out, localhost = T, model_final, dfm_words, varname, es_super) ################################################################################################# #################################### Update any kind of classification ########################## ################################################################################################# -class_update <- function(out, model_final, dfm_words, varname, es_super) { +class_update <- function(out, localhost = T, model_final, dfm_words, varname, es_super) { print('updating') dfm <- dfm_gen(out, text = 'lemmas') %>% dfm_keep(dfm_words, valuetype="fixed", verbose=T) pred <- data.frame(id = out$`_id`, pred = predict(model_final, newdata = dfm)) - bulk <- apply(pred, 1, bulk_writer, varname = varname) - res <- elastic_update(bulk, es_super = es_super) + bulk <- apply(pred, 1, bulk_writer, varname = varname, type = 'set') + res <- elastic_update(bulk, es_super = es_super, localhost = localhost) stop_for_status(res) - content(res, "parsed", "application/json") appData <- content(res) if (appData$errors == T){ print(appData) stop("Aborting, errors found during updating") } print("updated") - Sys.sleep(1) } diff --git a/R/elastic_update.R b/R/elastic_update.R index 54e307c..ce2af64 100644 --- a/R/elastic_update.R +++ b/R/elastic_update.R @@ -3,7 +3,7 @@ #' Push a line-delimited JSON string to Elasticsearch as bulk update #' @param x Line-delimited JSON suitable for use as Elasticsearch bulk update #' @param es_super The even-more-secret (do not store this anywhere!!!) password for updating (or messing up!) the entire database -#' @param local Defaults to false. When true, connect to a local Elasticsearch instance on the default port (9200) +#' @param localhost Defaults to true. When true, connect to a local Elasticsearch instance on the default port (9200) #' @return An html response object indicating the status of the update #' @export #' @examples @@ -12,12 +12,12 @@ ################################################################################################# #################################### Elasticsearch Updater ################################ ################################################################################################# -elastic_update <- function(x, es_super = 'secret', local = F) { - bulk <- paste0(x,'\n') - if (local == F) { +elastic_update <- function(x, es_super = 'secret', localhost = T) { + bulk <- paste0(paste0(x, collapse = '\n'),'\n') + if (localhost == F) { url <- paste0('https://super:',es_super,'@linux01.uis.no/es/_bulk?pretty&refresh=wait_for') } - if (local == T) { + if (localhost == T) { url <- 'http://localhost:9200/_bulk?pretty' } res <- RETRY("POST", url = url @@ -27,8 +27,5 @@ elastic_update <- function(x, es_super = 'secret', local = F) { , times = 10 , pause_min = 10 ) - # stop_for_status(res) - # content(res, "parsed", "application/json") - # appData <- content(res) return(res) } diff --git a/R/elasticizer.R b/R/elasticizer.R index 550acc4..15e782b 100644 --- a/R/elasticizer.R +++ b/R/elasticizer.R @@ -11,12 +11,13 @@ #' @return A data frame containing all the search results #' @export #' @examples -#' elasticizer(query, src = TRUE, index = "maml", update = NULL, local = F) +#' elasticizer(query, src = TRUE, index = "maml", update = NULL, localhost = F) ################################################################################################# #################################### Get data from ElasticSearch ################################ ################################################################################################# -elasticizer <- function(query, src = T, index = 'maml', es_pwd = .rs.askForPassword("Elasticsearch READ"), update = NULL, local = F, ...){ - if (local == F) { +elasticizer <- function(query, src = T, index = 'maml', es_pwd = .rs.askForPassword("Elasticsearch READ"), update = NULL, localhost = F, ...){ + httr::set_config(httr::config(http_version = 0)) + if (localhost == F) { connect(es_port = 443, es_transport = 'https', es_host = 'linux01.uis.no', @@ -25,7 +26,7 @@ elasticizer <- function(query, src = T, index = 'maml', es_pwd = .rs.askForPassw es_pwd = es_pwd, errors = 'complete') } - if (local == T){ + if (localhost == T){ connect(es_port = 9200, es_transport = 'http', es_host = 'localhost', @@ -51,7 +52,7 @@ elasticizer <- function(query, src = T, index = 'maml', es_pwd = .rs.askForPassw batch <- 1 print(paste0('Processing documents ',batch*1000-1000,' through ',batch*1000,' out of ',total,' documents.')) if (length(update) > 0){ - update(out, ...) + update(out, localhost = localhost, ...) } while(hits != 0){ res <- scroll(json$`_scroll_id`, time_scroll="5m", raw=T) @@ -62,7 +63,7 @@ elasticizer <- function(query, src = T, index = 'maml', es_pwd = .rs.askForPassw print(paste0('Processing documents ',batch*1000-1000,' through ',batch*1000,' out of ',total,' documents.')) if (length(update) > 0){ out <- jsonlite:::flatten(json$hits$hits) - update(out, ...) + update(out, localhost = localhost, ...) } else { out <- bind_rows(out, jsonlite:::flatten(json$hits$hits)) } diff --git a/man/bulk_writer.Rd b/man/bulk_writer.Rd index 0b837c7..d8a03f9 100644 --- a/man/bulk_writer.Rd +++ b/man/bulk_writer.Rd @@ -4,20 +4,25 @@ \alias{bulk_writer} \title{Generate a line-delimited JSON string for use in Elasticsearch bulk updates} \usage{ -bulk_writer(x, index = "maml", varname = "updated_variable") +bulk_writer(x, index = "maml", varname = "updated_variable", type) } \arguments{ \item{x}{A single-row data frame, or a string containing the variables and/or values that should be updated (a data frame is converted to a JSON object, strings are stored as-is)} \item{index}{The name of the Elasticsearch index to update} -\item{varname}{String indicating the parent variable that should be updated (when it does not exist, it will be created)} +\item{varname}{String indicating the parent variable that should be updated (when it does not exist, it will be created, all varnames arexed by computerCodes)} + +\item{type}{Type of updating to be done, can be either 'set', 'add', or 'addnested'} } \value{ A string usable as Elasticsearch bulk update command, in line-delimited JSON } \description{ Generate a line-delimited JSON string for use in Elasticsearch bulk updates +Type can be either one of three values: +set: set the value of [varname] to x +add: add x to the values of [varname] } \examples{ bulk_writer(x, index = 'maml', varname = 'updated_variable') diff --git a/man/class_update.Rd b/man/class_update.Rd index 12c5548..c8b56aa 100644 --- a/man/class_update.Rd +++ b/man/class_update.Rd @@ -4,11 +4,14 @@ \alias{class_update} \title{Classifier function for use in combination with the elasticizer function as 'update' parameter (without brackets), see elasticizer documentation for more information} \usage{ -class_update(out, model_final, dfm_words, varname, es_super) +class_update(out, localhost = T, model_final, dfm_words, varname, + es_super) } \arguments{ \item{out}{Does not need to be defined explicitly! (is already parsed in the elasticizer function)} +\item{localhost}{Defaults to false. When true, connect to a local Elasticsearch instance on the default port (9200)} + \item{model_final}{The classification model (e.g. output from textstat_nb(), svm() or others)} \item{dfm_words}{A dfm containing all the words and only the words used to generate the model (is used for subsetting)} @@ -24,5 +27,5 @@ As this is a nested function used within elasticizer, there is no return output Classifier function for use in combination with the elasticizer function as 'update' parameter (without brackets), see elasticizer documentation for more information } \examples{ -elasticizer(query, src = T, es_pwd = es_pwd, update = class_update, model_final = model_final, dfm_words = dfm_words, varname = computerCodes.varname, es_super = es_super) +class_update(out, localhost = T, model_final, dfm_words, varname, es_super) } diff --git a/man/elastic_update.Rd b/man/elastic_update.Rd index f7d6c40..3fac40a 100644 --- a/man/elastic_update.Rd +++ b/man/elastic_update.Rd @@ -4,14 +4,14 @@ \alias{elastic_update} \title{Push a line-delimited JSON string to Elasticsearch as bulk update} \usage{ -elastic_update(x, es_super = "secret", local = F) +elastic_update(x, es_super = "secret", localhost = T) } \arguments{ \item{x}{Line-delimited JSON suitable for use as Elasticsearch bulk update} \item{es_super}{The even-more-secret (do not store this anywhere!!!) password for updating (or messing up!) the entire database} -\item{local}{Defaults to false. When true, connect to a local Elasticsearch instance on the default port (9200)} +\item{localhost}{Defaults to true. When true, connect to a local Elasticsearch instance on the default port (9200)} } \value{ An html response object indicating the status of the update diff --git a/man/elasticizer.Rd b/man/elasticizer.Rd index ae7129c..0032426 100644 --- a/man/elasticizer.Rd +++ b/man/elasticizer.Rd @@ -6,7 +6,7 @@ \usage{ elasticizer(query, src = T, index = "maml", es_pwd = .rs.askForPassword("Elasticsearch READ"), update = NULL, - local = F, ...) + localhost = F, ...) } \arguments{ \item{query}{A JSON-formatted query in the Elasticsearch query DSL} @@ -17,9 +17,9 @@ elasticizer(query, src = T, index = "maml", \item{update}{When set, indicates an update function to use on each batch of 1000 articles} -\item{local}{Defaults to false. When true, connect to a local Elasticsearch instance on the default port (9200)} - \item{...}{Parameters passed on to the update function} + +\item{local}{Defaults to false. When true, connect to a local Elasticsearch instance on the default port (9200)} } \value{ A data frame containing all the search results @@ -28,5 +28,5 @@ A data frame containing all the search results Generate a data frame out of unparsed Elasticsearch JSON } \examples{ -elasticizer(query, src = TRUE, index = "maml", update = NULL, local = F) +elasticizer(query, src = TRUE, index = "maml", update = NULL, localhost = F) }