elasticizer: Updated bulk size to 1024 (a power of 2) and set a timeout of 900s every 500000 updates

query_gen_actors: Added an additional generator for the "Institution" type (for EU support)
actorizer: Created an updater function to search for actors and use UDPipe to parse the results
master
Erik de Vries 6 years ago
parent a3c3651c79
commit 39005c7518

@ -1,5 +1,6 @@
# Generated by roxygen2: do not edit by hand
export(actorizer)
export(bulk_writer)
export(class_update)
export(dfm_gen)

@ -0,0 +1,80 @@
#' Updater function for elasticizer: Conduct actor searches
#'
#' Updater function for elasticizer: Conduct actor searches
#' @param out Does not need to be defined explicitly! (is already parsed in the elasticizer function)
#' @param localhost Defaults to false. When true, connect to a local Elasticsearch instance on the default port (9200)
#' @param ids List of actor ids
#' @param prefix Regex containing prefixes that should be excluded from hits
#' @param postfix Regex containing postfixes that should be excluded from hits
#' @param identifier String used to mark highlights. Should be a lowercase string
#' @param udmodel The udpipe model used for parsing every hit
#' @param es_super Password for write access to ElasticSearch
#' @return As this is a nested function used within elasticizer, there is no return output
#' @export
#' @examples
#' actorizer(out, localhost = F, ids, type, prefix, postfix, identifier, udmodel, es_super)
actorizer <- function(out, localhost = F, ids, type, prefix, postfix, identifier, udmodel, es_super) {
fncols <- function(data, cname) {
add <-cname[!cname%in%names(data)]
if(length(add)!=0) data[add] <- NA
data
}
sentencizer <- function(row, out, udmodel, ids, prefix, postfix, identifier) {
### If no pre or postfixes, match *not nothing* i.e. anything
if (is.na(prefix) || prefix == '') {
prefix = '$^'
}
if (is.na(postfix) || postfix == '') {
postfix = '$^'
}
### Also needs fix for empty strings (non-NA)
doc <- out[row,]
ud <- as.data.frame(udpipe_annotate(udmodel, x = doc$merged, parser = "none", doc_id = doc$`_id`)) %>%
filter(upos != "PUNCT") # Removing punctuation to get accurate word counts
sentence_count <- length(unique(ud$sentence))
ud <- ud %>%
filter(grepl(paste0(identifier), sentence)) %>% # Only select sentences that contain the identifier
filter(!str_detect(sentence, postfix)) %>% # Filter out sentences with matching postfixes (false positives)
filter(!str_detect(sentence, prefix)) %>% # Filter out sentences with matching prefixes (false positives)
filter(grepl(paste0(identifier,'.*'), token)) %>% # Only select tokens that start with the identifier
group_by(doc_id) %>%
summarise(
sentence_id = list(list(as.integer(sentence_id))),
token_id = list(list(as.integer(token_id))),
text = list(list(unique(as.character(sentence))))
)
occurences <- length(unique(ud$sentence_id)) # Number of sentences in which actor occurs
prominence <- occurences/sentence_count # Relative prominence of actor in article (number of occurences/total # sentences)
rel_first <- 1-(ud$sentence_id[[1]][[1]][1]/sentence_count) # Relative position of first occurence at sentence level
return(data.frame(ud,occ = occurences,prom = prominence,rel_first = rel_first, ids = I(list(list(ids)))))
}
out <- fncols(out, c("highlight.text","highlight.title","highlight.teaser", "highlight.subtitle", "highlight.preteaser", '_source.text', '_source.title','_source.teaser','_source.subtitle','_source.preteaser'))
out <- replace(out, out=="NULL", NA)
### Replacing empty highlights with source text (to have the exact same text for udpipe to process)
out$highlight.title[is.na(out$highlight.title)] <- out$`_source.title`[is.na(out$highlight.title)]
out$highlight.text[is.na(out$highlight.text)] <- out$`_source.text`[is.na(out$highlight.text)]
out$highlight.teaser[is.na(out$highlight.teaser)] <- out$`_source.teaser`[is.na(out$highlight.teaser)]
out$highlight.subtitle[is.na(out$highlight.subtitle)] <- out$`_source.subtitle`[is.na(out$highlight.subtitle)]
out$highlight.preteaser[is.na(out$highlight.preteaser)] <- out$`_source.preteaser`[is.na(out$highlight.preteaser)]
out$merged <- str_c(str_replace_na(unlist(out$highlight.title), replacement = " "),
str_replace_na(unlist(out$highlight.subtitle), replacement = " "),
str_replace_na(unlist(out$highlight.preteaser), replacement = " "),
str_replace_na(unlist(out$highlight.teaser), replacement = " "),
str_replace_na(unlist(out$highlight.text), replacement = " "),
sep = " ") %>%
# Replacing html tags with whitespaces
str_replace_all("<.*?>", " ") %>%
str_replace_all("\\s+"," ")
ids <- fromJSON(ids)
updates <- bind_rows(mclapply(seq(1,length(out[[1]]),1), sentencizer, out = out, ids = ids, postfix = postfix, prefix=prefix, identifier=identifier, udmodel = udmodel, mc.cores = detectCores()))
bulk <- apply(updates, 1, bulk_writer, varname ='actorsDetail', type = 'add')
bulk <- c(bulk,apply(updates[c(1,8)], 1, bulk_writer, varname='actors', type = 'add'))
return(elastic_update(bulk, es_super = es_super, localhost = localhost))
}

@ -4,6 +4,8 @@
#' @param query A JSON-formatted query in the Elasticsearch query DSL
#' @param src Logical (true/false) indicating whether or not the source of each document should be retrieved
#' @param index The name of the Elasticsearch index to search through
#' @param es_pwd The password for Elasticsearch read access
#' @param size Batch size
#' @param update When set, indicates an update function to use on each batch of 1000 articles
#' @param local Defaults to false. When true, connect to a local Elasticsearch instance on the default port (9200)
#' @param ... Parameters passed on to the update function
@ -15,7 +17,7 @@
#################################################################################################
#################################### Get data from ElasticSearch ################################
#################################################################################################
elasticizer <- function(query, src = T, index = 'maml', es_pwd = .rs.askForPassword("Elasticsearch READ"), update = NULL, localhost = F, ...){
elasticizer <- function(query, src = T, index = 'maml', es_pwd = .rs.askForPassword("Elasticsearch READ"), size = 1024, update = NULL, localhost = F, ...){
httr::set_config(httr::config(http_version = 0))
if (localhost == F) {
connect(es_port = 443,
@ -37,10 +39,10 @@ elasticizer <- function(query, src = T, index = 'maml', es_pwd = .rs.askForPassw
}
# Get all results - one approach is to use a while loop
if (src == T) {
res <- Search(index = index, time_scroll="20m",body = query, size = 1000, raw=T)
res <- Search(index = index, time_scroll="20m",body = query, size = size, raw=T)
}
if (src == F) {
res <- Search(index = index, time_scroll="20m",body = query, size = 1000, raw=T, source = F)
res <- Search(index = index, time_scroll="20m",body = query, size = size, raw=T, source = F)
}
json <- fromJSON(res)
if (json$hits$total == 0) {
@ -50,7 +52,7 @@ elasticizer <- function(query, src = T, index = 'maml', es_pwd = .rs.askForPassw
total <- json$hits$total
hits <- 1
batch <- 1
print(paste0('Processing documents ',batch*1000-1000,' through ',batch*1000,' out of ',total,' documents.'))
print(paste0('Processing documents ',batch*size-size,' through ',batch*size,' out of ',total,' documents.'))
if (length(update) > 0){
update(out, localhost = localhost, ...)
}
@ -60,11 +62,11 @@ elasticizer <- function(query, src = T, index = 'maml', es_pwd = .rs.askForPassw
hits <- length(json$hits$hits)
if(hits > 0) {
batch <- batch+1
print(paste0('Processing documents ',batch*1000-1000,' through ',batch*1000,' out of ',total,' documents.'))
print(paste0('Processing documents ',batch*size-size,' through ',batch*size,' out of ',total,' documents.'))
if (length(update) > 0){
out <- jsonlite:::flatten(json$hits$hits)
update(out, localhost = localhost, ...)
if (batch%%200 == 0) {
if (batch%%500 == 0) {
Sys.sleep(900)
}
} else {

@ -3,6 +3,7 @@
#' Generate actor search queries based on data in actor db
#' @param actor A row from the output of elasticizer() when run on the 'actor' index
#' @param country 2-letter string indicating the country for which to generate the queries, is related to inflected nouns, definitive forms and genitive forms of names etc.
#' @param identifier Identifier used to mark hits in the text, identifiers are prepended before the actual hit
#' @return A data frame containing the queries, related actor ids and actor function
#' @export
#' @examples
@ -11,7 +12,7 @@
#################################################################################################
#################################### Actor search query generator ###############################
#################################################################################################
query_gen_actors <- function(actor, country) {
query_gen_actors <- function(actor, country, identifier) {
highlight <- paste0('"highlight" : {
"fields" : {
"text" : {},
@ -31,6 +32,10 @@ if (country == "no") {
genitive <- 's'
definitive <- 'en'
definitive_genitive <- 'ens'
} else if (country == 'uk') {
genitive <- '\'s'
definitive <- 's'
definitive_genitive <- ''
} else {
genitive <- ''
definitive <- ''
@ -68,6 +73,82 @@ if (country == "no") {
query_string <- paste0(query_string,') OR (',lastname,' AND (',names,')))')
ids <- toJSON(unlist(lapply(c(actor$`_source.actorId`,actor$`_source.ministryId`,actor$`_source.partyId`),str_c, "_min")))
}
if (actor$`_source.function` == "Institution") {
#uppercasing
firstup <- function(x) {
substr(x, 1, 1) <- toupper(substr(x, 1, 1))
x
}
actor$`_source.startDate` <- "2000-01-01"
actor$`_source.endDate` <- "2099-01-01"
if (nchar(actor$`_source.institutionNameSearch`[[1]]) > 0) {
upper <- unlist(lapply(actor$`_source.institutionNameSearch`, firstup))
upper <- c(upper, unlist(lapply(upper, str_c, genitive)),
unlist(lapply(upper, str_c, definitive)),
unlist(lapply(upper, str_c, definitive_genitive)))
capital <- unlist(lapply(actor$`_source.institutionNameSearch`, str_to_title))
capital <- c(capital, unlist(lapply(capital, str_c, genitive)),
unlist(lapply(capital, str_c, definitive)),
unlist(lapply(capital, str_c, definitive_genitive)))
base <- actor$`_source.institutionNameSearch`
base <- c(base, unlist(lapply(base, str_c, genitive)),
unlist(lapply(base, str_c, definitive)),
unlist(lapply(base, str_c, definitive_genitive)))
names <- paste(unique(c(upper,capital,base)), collapse = '\\" \\"')
query_string <- paste0('(\\"',names,'\\")')
ids <- toJSON(unlist(lapply(c(actor$`_source.institutionId`),str_c, "_f")))
query <- paste0('{"query":
{"bool": {"filter":[{"term":{"country":"',country,'"}},
{"range":{"publication_date":{"gte":"',actor$`_source.startDate`,'","lte":"',actor$`_source.endDate`,'"}}},
{"query_string" : {
"default_operator" : "OR",
"allow_leading_wildcard" : "false",
"fields": ["text","teaser","preteaser","title","subtitle"],
"query" : "', query_string,'"
}
}
]
} },',highlight,' }')
df1 <- data.frame(query = query, ids = I(ids), type = actor$`_source.function`, stringsAsFactors = F)
}
if (nchar(actor$`_source.institutionNameSearchShort`[[1]]) > 0) {
upper <- unlist(lapply(actor$`_source.institutionNameSearchShort`, firstup))
upper <- c(upper, unlist(lapply(upper, str_c, genitive)),
unlist(lapply(upper, str_c, definitive)),
unlist(lapply(upper, str_c, definitive_genitive)))
capital <- unlist(lapply(actor$`_source.institutionNameSearchShort`, str_to_title))
capital <- c(capital, unlist(lapply(capital, str_c, genitive)),
unlist(lapply(capital, str_c, definitive)),
unlist(lapply(capital, str_c, definitive_genitive)))
base <- actor$`_source.institutionNameSearchShort`
base <- c(base, unlist(lapply(base, str_c, genitive)),
unlist(lapply(base, str_c, definitive)),
unlist(lapply(base, str_c, definitive_genitive)))
names <- paste(unique(c(upper,capital,base)), collapse = '\\" \\"')
query_string <- paste0('(\\"',names,'\\")')
ids <- toJSON(unlist(lapply(c(actor$`_source.institutionId`),str_c, "_s")))
query <- paste0('{"query":
{"bool": {"filter":[{"term":{"country":"',country,'"}},
{"range":{"publication_date":{"gte":"',actor$`_source.startDate`,'","lte":"',actor$`_source.endDate`,'"}}},
{"query_string" : {
"default_operator" : "OR",
"allow_leading_wildcard" : "false",
"fields": ["text","teaser","preteaser","title","subtitle"],
"query" : "', query_string,'"
}
}
]
} },',highlight,' }')
df2 <- data.frame(query = query, ids = I(ids), type = actor$`_source.function`, stringsAsFactors = F)
}
if (exists('df1') == T & exists('df2') == T) {
return(bind_rows(df1,df2))
} else if (exists('df1') == T) {
return(df1)
} else if (exists('df2') == T) {
return(df2)
}
}
if (actor$`_source.function` == "Party") {
actor$`_source.startDate` <- "2000-01-01"
actor$`_source.endDate` <- "2099-01-01"

@ -0,0 +1,35 @@
% Generated by roxygen2: do not edit by hand
% Please edit documentation in R/actorizer.R
\name{actorizer}
\alias{actorizer}
\title{Updater function for elasticizer: Conduct actor searches}
\usage{
actorizer(out, localhost = F, ids, type, prefix, postfix, identifier,
udmodel, es_super)
}
\arguments{
\item{out}{Does not need to be defined explicitly! (is already parsed in the elasticizer function)}
\item{localhost}{Defaults to false. When true, connect to a local Elasticsearch instance on the default port (9200)}
\item{ids}{List of actor ids}
\item{prefix}{Regex containing prefixes that should be excluded from hits}
\item{postfix}{Regex containing postfixes that should be excluded from hits}
\item{identifier}{String used to mark highlights. Should be a lowercase string}
\item{udmodel}{The udpipe model used for parsing every hit}
\item{es_super}{Password for write access to ElasticSearch}
}
\value{
As this is a nested function used within elasticizer, there is no return output
}
\description{
Updater function for elasticizer: Conduct actor searches
}
\examples{
actorizer(out, localhost = F, ids, type, prefix, postfix, identifier, udmodel, es_super)
}

@ -23,7 +23,7 @@ Generate a line-delimited JSON string for use in Elasticsearch bulk updates
Type can be either one of three values:
set: set the value of [varname] to x
add: add x to the values of [varname]
varname: When using tokens, the token field will be updated instead of a computerCodes field
varname: When using ud, the ud field will be updated instead of a computerCodes field
}
\examples{
bulk_writer(x, index = 'maml', varname = 'updated_variable')

@ -5,8 +5,8 @@
\title{Generate a data frame out of unparsed Elasticsearch JSON}
\usage{
elasticizer(query, src = T, index = "maml",
es_pwd = .rs.askForPassword("Elasticsearch READ"), update = NULL,
localhost = F, ...)
es_pwd = .rs.askForPassword("Elasticsearch READ"), size = 1024,
update = NULL, localhost = F, ...)
}
\arguments{
\item{query}{A JSON-formatted query in the Elasticsearch query DSL}
@ -15,6 +15,10 @@ elasticizer(query, src = T, index = "maml",
\item{index}{The name of the Elasticsearch index to search through}
\item{es_pwd}{The password for Elasticsearch read access}
\item{size}{Batch size}
\item{update}{When set, indicates an update function to use on each batch of 1000 articles}
\item{...}{Parameters passed on to the update function}

@ -4,12 +4,14 @@
\alias{query_gen_actors}
\title{Generate actor search queries based on data in actor db}
\usage{
query_gen_actors(actor, country)
query_gen_actors(actor, country, identifier)
}
\arguments{
\item{actor}{A row from the output of elasticizer() when run on the 'actor' index}
\item{country}{2-letter string indicating the country for which to generate the queries, is related to inflected nouns, definitive forms and genitive forms of names etc.}
\item{identifier}{Identifier used to mark hits in the text, identifiers are prepended before the actual hit}
}
\value{
A data frame containing the queries, related actor ids and actor function

Loading…
Cancel
Save