Major overhaul to ES bulk update integration. Added support for both setting and appending to variables

master
Erik de Vries 6 years ago
parent 4adae2bbc6
commit f543d658bd

@ -1,9 +1,13 @@
#' Generate a line-delimited JSON string for use in Elasticsearch bulk updates #' Generate a line-delimited JSON string for use in Elasticsearch bulk updates
#' #'
#' Generate a line-delimited JSON string for use in Elasticsearch bulk updates #' Generate a line-delimited JSON string for use in Elasticsearch bulk updates
#' Type can be either one of three values:
#' set: set the value of [varname] to x
#' add: add x to the values of [varname]
#' @param x A single-row data frame, or a string containing the variables and/or values that should be updated (a data frame is converted to a JSON object, strings are stored as-is) #' @param x A single-row data frame, or a string containing the variables and/or values that should be updated (a data frame is converted to a JSON object, strings are stored as-is)
#' @param index The name of the Elasticsearch index to update #' @param index The name of the Elasticsearch index to update
#' @param varname String indicating the parent variable that should be updated (when it does not exist, it will be created) #' @param varname String indicating the parent variable that should be updated (when it does not exist, it will be created, all varnames arexed by computerCodes)
#' @param type Type of updating to be done, can be either 'set', 'add', or 'addnested'
#' @return A string usable as Elasticsearch bulk update command, in line-delimited JSON #' @return A string usable as Elasticsearch bulk update command, in line-delimited JSON
#' @export #' @export
#' @examples #' @examples
@ -11,9 +15,24 @@
################################################################################################# #################################################################################################
#################################### Bulk update writer ################################ #################################### Bulk update writer ################################
################################################################################################# #################################################################################################
bulk_writer <- function(x, index = 'maml', varname = 'updated_variable') { bulk_writer <- function(x, index = 'maml', varname = 'updated_variable', type) {
return( ### Create a json object if more than one variable besides _id, otherwise use value as-is
paste0('{"update": {"_index": "',index,'", "_type": "doc", "_id": "',x[1],'"}} if (length(x) > 2) {
{ "script" : { "source": "ctx._source.',varname,' = params.code", "lang" : "painless","params" : {"code":',toJSON(x[-1], collapse = F),'}}}') json <- toJSON(bind_rows(x[-1]), collapse = T)
) } else {
json <- toJSON(x[-1], collapse = T)
}
if (type == 'set') {
return(
paste0('{"update": {"_index": "',index,'", "_type": "doc", "_id": "',x[1],'"}}
{ "script" : { "source": "if (ctx._source.computerCodes != null) {ctx._source.computerCodes.',varname,' = params.code} else {ctx._source.computerCodes = params.object}", "lang" : "painless", "params": { "code": ',json,', "object": {"',varname,'": ',json,'} }}}')
)
}
if (type == "add") {
return(
paste0('{"update": {"_index": "',index,'", "_type": "doc", "_id": "',x[1],'"}}
{"script": {"source": "if (ctx._source.computerCodes != null && ctx._source.computerCodes.containsKey(\\"',varname,'\\")) {ctx._source.computerCodes.',varname,'.addAll(params.code)} else if (ctx._source.computerCodes != null) {ctx._source.computerCodes.',varname,' = params.code} else {ctx._source.computerCodes = params.object}", "lang" : "painless", "params": { "code": ',json,' , "object": {"',varname,'": ',json,'}}}}'
)
)
}
} }

@ -2,6 +2,7 @@
#' #'
#' Classifier function for use in combination with the elasticizer function as 'update' parameter (without brackets), see elasticizer documentation for more information #' Classifier function for use in combination with the elasticizer function as 'update' parameter (without brackets), see elasticizer documentation for more information
#' @param out Does not need to be defined explicitly! (is already parsed in the elasticizer function) #' @param out Does not need to be defined explicitly! (is already parsed in the elasticizer function)
#' @param localhost Defaults to false. When true, connect to a local Elasticsearch instance on the default port (9200)
#' @param model_final The classification model (e.g. output from textstat_nb(), svm() or others) #' @param model_final The classification model (e.g. output from textstat_nb(), svm() or others)
#' @param dfm_words A dfm containing all the words and only the words used to generate the model (is used for subsetting) #' @param dfm_words A dfm containing all the words and only the words used to generate the model (is used for subsetting)
#' @param varname String containing the variable name to use for the classification result, usually has the format computerCodes.varname #' @param varname String containing the variable name to use for the classification result, usually has the format computerCodes.varname
@ -9,24 +10,22 @@
#' @return As this is a nested function used within elasticizer, there is no return output #' @return As this is a nested function used within elasticizer, there is no return output
#' @export #' @export
#' @examples #' @examples
#' elasticizer(query, src = T, es_pwd = es_pwd, update = class_update, model_final = model_final, dfm_words = dfm_words, varname = computerCodes.varname, es_super = es_super) #' class_update(out, localhost = T, model_final, dfm_words, varname, es_super)
################################################################################################# #################################################################################################
#################################### Update any kind of classification ########################## #################################### Update any kind of classification ##########################
################################################################################################# #################################################################################################
class_update <- function(out, model_final, dfm_words, varname, es_super) { class_update <- function(out, localhost = T, model_final, dfm_words, varname, es_super) {
print('updating') print('updating')
dfm <- dfm_gen(out, text = 'lemmas') %>% dfm <- dfm_gen(out, text = 'lemmas') %>%
dfm_keep(dfm_words, valuetype="fixed", verbose=T) dfm_keep(dfm_words, valuetype="fixed", verbose=T)
pred <- data.frame(id = out$`_id`, pred = predict(model_final, newdata = dfm)) pred <- data.frame(id = out$`_id`, pred = predict(model_final, newdata = dfm))
bulk <- apply(pred, 1, bulk_writer, varname = varname) bulk <- apply(pred, 1, bulk_writer, varname = varname, type = 'set')
res <- elastic_update(bulk, es_super = es_super) res <- elastic_update(bulk, es_super = es_super, localhost = localhost)
stop_for_status(res) stop_for_status(res)
content(res, "parsed", "application/json")
appData <- content(res) appData <- content(res)
if (appData$errors == T){ if (appData$errors == T){
print(appData) print(appData)
stop("Aborting, errors found during updating") stop("Aborting, errors found during updating")
} }
print("updated") print("updated")
Sys.sleep(1)
} }

@ -3,7 +3,7 @@
#' Push a line-delimited JSON string to Elasticsearch as bulk update #' Push a line-delimited JSON string to Elasticsearch as bulk update
#' @param x Line-delimited JSON suitable for use as Elasticsearch bulk update #' @param x Line-delimited JSON suitable for use as Elasticsearch bulk update
#' @param es_super The even-more-secret (do not store this anywhere!!!) password for updating (or messing up!) the entire database #' @param es_super The even-more-secret (do not store this anywhere!!!) password for updating (or messing up!) the entire database
#' @param local Defaults to false. When true, connect to a local Elasticsearch instance on the default port (9200) #' @param localhost Defaults to true. When true, connect to a local Elasticsearch instance on the default port (9200)
#' @return An html response object indicating the status of the update #' @return An html response object indicating the status of the update
#' @export #' @export
#' @examples #' @examples
@ -12,12 +12,12 @@
################################################################################################# #################################################################################################
#################################### Elasticsearch Updater ################################ #################################### Elasticsearch Updater ################################
################################################################################################# #################################################################################################
elastic_update <- function(x, es_super = 'secret', local = F) { elastic_update <- function(x, es_super = 'secret', localhost = T) {
bulk <- paste0(x,'\n') bulk <- paste0(paste0(x, collapse = '\n'),'\n')
if (local == F) { if (localhost == F) {
url <- paste0('https://super:',es_super,'@linux01.uis.no/es/_bulk?pretty&refresh=wait_for') url <- paste0('https://super:',es_super,'@linux01.uis.no/es/_bulk?pretty&refresh=wait_for')
} }
if (local == T) { if (localhost == T) {
url <- 'http://localhost:9200/_bulk?pretty' url <- 'http://localhost:9200/_bulk?pretty'
} }
res <- RETRY("POST", url = url res <- RETRY("POST", url = url
@ -27,8 +27,5 @@ elastic_update <- function(x, es_super = 'secret', local = F) {
, times = 10 , times = 10
, pause_min = 10 , pause_min = 10
) )
# stop_for_status(res)
# content(res, "parsed", "application/json")
# appData <- content(res)
return(res) return(res)
} }

@ -11,12 +11,13 @@
#' @return A data frame containing all the search results #' @return A data frame containing all the search results
#' @export #' @export
#' @examples #' @examples
#' elasticizer(query, src = TRUE, index = "maml", update = NULL, local = F) #' elasticizer(query, src = TRUE, index = "maml", update = NULL, localhost = F)
################################################################################################# #################################################################################################
#################################### Get data from ElasticSearch ################################ #################################### Get data from ElasticSearch ################################
################################################################################################# #################################################################################################
elasticizer <- function(query, src = T, index = 'maml', es_pwd = .rs.askForPassword("Elasticsearch READ"), update = NULL, local = F, ...){ elasticizer <- function(query, src = T, index = 'maml', es_pwd = .rs.askForPassword("Elasticsearch READ"), update = NULL, localhost = F, ...){
if (local == F) { httr::set_config(httr::config(http_version = 0))
if (localhost == F) {
connect(es_port = 443, connect(es_port = 443,
es_transport = 'https', es_transport = 'https',
es_host = 'linux01.uis.no', es_host = 'linux01.uis.no',
@ -25,7 +26,7 @@ elasticizer <- function(query, src = T, index = 'maml', es_pwd = .rs.askForPassw
es_pwd = es_pwd, es_pwd = es_pwd,
errors = 'complete') errors = 'complete')
} }
if (local == T){ if (localhost == T){
connect(es_port = 9200, connect(es_port = 9200,
es_transport = 'http', es_transport = 'http',
es_host = 'localhost', es_host = 'localhost',
@ -51,7 +52,7 @@ elasticizer <- function(query, src = T, index = 'maml', es_pwd = .rs.askForPassw
batch <- 1 batch <- 1
print(paste0('Processing documents ',batch*1000-1000,' through ',batch*1000,' out of ',total,' documents.')) print(paste0('Processing documents ',batch*1000-1000,' through ',batch*1000,' out of ',total,' documents.'))
if (length(update) > 0){ if (length(update) > 0){
update(out, ...) update(out, localhost = localhost, ...)
} }
while(hits != 0){ while(hits != 0){
res <- scroll(json$`_scroll_id`, time_scroll="5m", raw=T) res <- scroll(json$`_scroll_id`, time_scroll="5m", raw=T)
@ -62,7 +63,7 @@ elasticizer <- function(query, src = T, index = 'maml', es_pwd = .rs.askForPassw
print(paste0('Processing documents ',batch*1000-1000,' through ',batch*1000,' out of ',total,' documents.')) print(paste0('Processing documents ',batch*1000-1000,' through ',batch*1000,' out of ',total,' documents.'))
if (length(update) > 0){ if (length(update) > 0){
out <- jsonlite:::flatten(json$hits$hits) out <- jsonlite:::flatten(json$hits$hits)
update(out, ...) update(out, localhost = localhost, ...)
} else { } else {
out <- bind_rows(out, jsonlite:::flatten(json$hits$hits)) out <- bind_rows(out, jsonlite:::flatten(json$hits$hits))
} }

@ -4,20 +4,25 @@
\alias{bulk_writer} \alias{bulk_writer}
\title{Generate a line-delimited JSON string for use in Elasticsearch bulk updates} \title{Generate a line-delimited JSON string for use in Elasticsearch bulk updates}
\usage{ \usage{
bulk_writer(x, index = "maml", varname = "updated_variable") bulk_writer(x, index = "maml", varname = "updated_variable", type)
} }
\arguments{ \arguments{
\item{x}{A single-row data frame, or a string containing the variables and/or values that should be updated (a data frame is converted to a JSON object, strings are stored as-is)} \item{x}{A single-row data frame, or a string containing the variables and/or values that should be updated (a data frame is converted to a JSON object, strings are stored as-is)}
\item{index}{The name of the Elasticsearch index to update} \item{index}{The name of the Elasticsearch index to update}
\item{varname}{String indicating the parent variable that should be updated (when it does not exist, it will be created)} \item{varname}{String indicating the parent variable that should be updated (when it does not exist, it will be created, all varnames arexed by computerCodes)}
\item{type}{Type of updating to be done, can be either 'set', 'add', or 'addnested'}
} }
\value{ \value{
A string usable as Elasticsearch bulk update command, in line-delimited JSON A string usable as Elasticsearch bulk update command, in line-delimited JSON
} }
\description{ \description{
Generate a line-delimited JSON string for use in Elasticsearch bulk updates Generate a line-delimited JSON string for use in Elasticsearch bulk updates
Type can be either one of three values:
set: set the value of [varname] to x
add: add x to the values of [varname]
} }
\examples{ \examples{
bulk_writer(x, index = 'maml', varname = 'updated_variable') bulk_writer(x, index = 'maml', varname = 'updated_variable')

@ -4,11 +4,14 @@
\alias{class_update} \alias{class_update}
\title{Classifier function for use in combination with the elasticizer function as 'update' parameter (without brackets), see elasticizer documentation for more information} \title{Classifier function for use in combination with the elasticizer function as 'update' parameter (without brackets), see elasticizer documentation for more information}
\usage{ \usage{
class_update(out, model_final, dfm_words, varname, es_super) class_update(out, localhost = T, model_final, dfm_words, varname,
es_super)
} }
\arguments{ \arguments{
\item{out}{Does not need to be defined explicitly! (is already parsed in the elasticizer function)} \item{out}{Does not need to be defined explicitly! (is already parsed in the elasticizer function)}
\item{localhost}{Defaults to false. When true, connect to a local Elasticsearch instance on the default port (9200)}
\item{model_final}{The classification model (e.g. output from textstat_nb(), svm() or others)} \item{model_final}{The classification model (e.g. output from textstat_nb(), svm() or others)}
\item{dfm_words}{A dfm containing all the words and only the words used to generate the model (is used for subsetting)} \item{dfm_words}{A dfm containing all the words and only the words used to generate the model (is used for subsetting)}
@ -24,5 +27,5 @@ As this is a nested function used within elasticizer, there is no return output
Classifier function for use in combination with the elasticizer function as 'update' parameter (without brackets), see elasticizer documentation for more information Classifier function for use in combination with the elasticizer function as 'update' parameter (without brackets), see elasticizer documentation for more information
} }
\examples{ \examples{
elasticizer(query, src = T, es_pwd = es_pwd, update = class_update, model_final = model_final, dfm_words = dfm_words, varname = computerCodes.varname, es_super = es_super) class_update(out, localhost = T, model_final, dfm_words, varname, es_super)
} }

@ -4,14 +4,14 @@
\alias{elastic_update} \alias{elastic_update}
\title{Push a line-delimited JSON string to Elasticsearch as bulk update} \title{Push a line-delimited JSON string to Elasticsearch as bulk update}
\usage{ \usage{
elastic_update(x, es_super = "secret", local = F) elastic_update(x, es_super = "secret", localhost = T)
} }
\arguments{ \arguments{
\item{x}{Line-delimited JSON suitable for use as Elasticsearch bulk update} \item{x}{Line-delimited JSON suitable for use as Elasticsearch bulk update}
\item{es_super}{The even-more-secret (do not store this anywhere!!!) password for updating (or messing up!) the entire database} \item{es_super}{The even-more-secret (do not store this anywhere!!!) password for updating (or messing up!) the entire database}
\item{local}{Defaults to false. When true, connect to a local Elasticsearch instance on the default port (9200)} \item{localhost}{Defaults to true. When true, connect to a local Elasticsearch instance on the default port (9200)}
} }
\value{ \value{
An html response object indicating the status of the update An html response object indicating the status of the update

@ -6,7 +6,7 @@
\usage{ \usage{
elasticizer(query, src = T, index = "maml", elasticizer(query, src = T, index = "maml",
es_pwd = .rs.askForPassword("Elasticsearch READ"), update = NULL, es_pwd = .rs.askForPassword("Elasticsearch READ"), update = NULL,
local = F, ...) localhost = F, ...)
} }
\arguments{ \arguments{
\item{query}{A JSON-formatted query in the Elasticsearch query DSL} \item{query}{A JSON-formatted query in the Elasticsearch query DSL}
@ -17,9 +17,9 @@ elasticizer(query, src = T, index = "maml",
\item{update}{When set, indicates an update function to use on each batch of 1000 articles} \item{update}{When set, indicates an update function to use on each batch of 1000 articles}
\item{local}{Defaults to false. When true, connect to a local Elasticsearch instance on the default port (9200)}
\item{...}{Parameters passed on to the update function} \item{...}{Parameters passed on to the update function}
\item{local}{Defaults to false. When true, connect to a local Elasticsearch instance on the default port (9200)}
} }
\value{ \value{
A data frame containing all the search results A data frame containing all the search results
@ -28,5 +28,5 @@ A data frame containing all the search results
Generate a data frame out of unparsed Elasticsearch JSON Generate a data frame out of unparsed Elasticsearch JSON
} }
\examples{ \examples{
elasticizer(query, src = TRUE, index = "maml", update = NULL, local = F) elasticizer(query, src = TRUE, index = "maml", update = NULL, localhost = F)
} }

Loading…
Cancel
Save