From 0d81d6fc7a30a8cc537830da0ec54df1ab6cde5f Mon Sep 17 00:00:00 2001 From: Erik de Vries Date: Wed, 29 May 2019 19:16:13 +0200 Subject: [PATCH] added aggregator and aggregator_elastic functions for aggregating and storing article level actor aggregations --- R/aggregator.R | 39 ++++++++++++++++++++++++ R/aggregator_elastic.R | 64 +++++++++++++++++++++++++++++++++++++++ man/aggregator.Rd | 24 +++++++++++++++ man/aggregator_elastic.Rd | 30 ++++++++++++++++++ 4 files changed, 157 insertions(+) create mode 100644 R/aggregator.R create mode 100644 R/aggregator_elastic.R create mode 100644 man/aggregator.Rd create mode 100644 man/aggregator_elastic.Rd diff --git a/R/aggregator.R b/R/aggregator.R new file mode 100644 index 0000000..101388b --- /dev/null +++ b/R/aggregator.R @@ -0,0 +1,39 @@ +#' Aggregator function, to aggregate actor results +#' +#' Aggregator function, to aggregate actor results +#' @param id Article id of the article for which actor aggregation should be done +#' @param actor_df The dataframe containing the actor data +#' @param merge_id The actorid that should be assigned to the merged result +#' @return A dataframe with the merged results +#' @export +#' @examples +#' aggregator(id, actor_df, merge_id) + +aggregator <- function (id, actor_df, merge_id) { +article <- filter(actor_df, `_id` == id) %>% + unnest(sentence_id, .preserve = colnames(.)) +occ <- length(unlist(unique(article$sentence_id1))) +sentence_count <- round(article$occ[[1]]/article$prom[[1]]) +prom <- occ/sentence_count +rel_first <- 1-(min(article$sentence_id1)/sentence_count) +actor_start <- sort(unique(unlist(article$actor_start))) +actor_end <- sort(unique(unlist(article$actor_end))) +sentence_start <- sort(unique(unlist(article$sentence_start))) +sentence_end <- sort(unique(unlist(article$sentence_end))) +sentence_id <- sort(unique(unlist(article$sentence_id))) + +return(data.frame(doc_id = first(article$`_id`), + sentence_id = I(list(as.integer(sentence_id))), + sentence_start = I(list(sentence_start)), + sentence_end = I(list(sentence_end)), + actor_start = I(list(actor_start)), # List of actor ud token start positions + actor_end = I(list(actor_end)), # List of actor ud token end positions + occ = occ, # Number of sentences in which actor occurs + prom = prom, # Relative prominence of actor in article (number of occurences/total # sentences) + rel_first = rel_first, # Relative position of first occurence at sentence level + first = min(article$sentence_id1), # First sentence in which actor is mentioned + ids = merge_id, # List of actor ids + stringsAsFactors = F +) +) +} diff --git a/R/aggregator_elastic.R b/R/aggregator_elastic.R new file mode 100644 index 0000000..9884f88 --- /dev/null +++ b/R/aggregator_elastic.R @@ -0,0 +1,64 @@ +### Notes: +# Do you want to search for either one OR other actorid, or both occuring in the same document? +# Do you want to keep only the occurences of the actorids you are searching for, or all actor occurences in the hits? +# Search by actorId, then aggregate by month +# When actorId starts with P_, define what hits you want to get (short, full, actor), if more than one, aggregate properly +# Develop query generator for specific actors (ie combine actorId with start and end dates) + + + +#' Generate and store aggregate actor measures to elasticsearch +#' +#' Generate and store aggregate actor measures to elasticsearch +#' @param out The output provided by elasticizer() +#' @param localhost Boolean indicating if the script should run locally, or remote +#' @param es_super Write password for ES +#' @param actorids List of actorids used in the search, should be the same as the actorids used for elasticizer() +#' @param ver String indicating the version of the update +#' @param cores Numeric value indicating the number of cores to use for processing +#' @return Return value is based on output of elastic_update() +#' @export +#' @examples +#' aggregator_elastic(out, localhost = F, actorids, ver, cores, es_super) +################################################################################################# +#################################### Aggregate actor results ################################ +################################################################################################# +aggregator_elastic <- function(out, localhost = F, actorids, ver, cores, es_super) { + ### Generating actor dataframe, unnest by actorsDetail, then by actor ids. Filter out non-relevant actor ids. + actor_df <- out %>% + unnest() %>% + unnest(ids, .preserve = colnames(.)) %>% + filter(ids1 %in% actorids) + + agg_party_actors <- bind_rows(mclapply(unique(actor_df$`_id`), + aggregator, + actor_df = actor_df, + merge_id = paste0(actor$`_source.partyId`,'_mfsa'), + mc.cores = cores)) + + party <- actor_df %>% + filter(!endsWith(ids1, '_a')) + agg_party <- bind_rows(mclapply(unique(party$`_id`), + aggregator, + actor_df = party, + merge_id = paste0(actor$`_source.partyId`,'_mfs'), + mc.cores = cores)) + + actors_only <- actor_df %>% + filter(endsWith(ids1, '_a')) + agg_actors <- bind_rows(mclapply(unique(actors_only$`_id`), + aggregator, + actor_df = actors_only, + merge_id = paste0(actor$`_source.partyId`,'_ma'), + mc.cores = cores)) + df_out <- bind_rows(agg_party_actors, agg_party, agg_actors) + doc_ids <- df_out$doc_id + df_out <- df_out %>% + select(-1) %>% + split(as.factor(doc_ids)) + df_out <- data.frame(doc_id = names(df_out), list = I(df_out)) + bulk <- apply(df_out, 1, bulk_writer, varname ='actorsDetail', type = 'add', ver = ver) + return(elastic_update(bulk, es_super = es_super, localhost = localhost)) +} + + diff --git a/man/aggregator.Rd b/man/aggregator.Rd new file mode 100644 index 0000000..22841ef --- /dev/null +++ b/man/aggregator.Rd @@ -0,0 +1,24 @@ +% Generated by roxygen2: do not edit by hand +% Please edit documentation in R/aggregator.R +\name{aggregator} +\alias{aggregator} +\title{Aggregator function, to aggregate actor results} +\usage{ +aggregator(id, actor_df, merge_id) +} +\arguments{ +\item{id}{Article id of the article for which actor aggregation should be done} + +\item{actor_df}{The dataframe containing the actor data} + +\item{merge_id}{The actorid that should be assigned to the merged result} +} +\value{ +A dataframe with the merged results +} +\description{ +Aggregator function, to aggregate actor results +} +\examples{ +aggregator(id, actor_df, merge_id) +} diff --git a/man/aggregator_elastic.Rd b/man/aggregator_elastic.Rd new file mode 100644 index 0000000..89a2f44 --- /dev/null +++ b/man/aggregator_elastic.Rd @@ -0,0 +1,30 @@ +% Generated by roxygen2: do not edit by hand +% Please edit documentation in R/actor_aggregation_db.R +\name{aggregator_elastic} +\alias{aggregator_elastic} +\title{Generate and store aggregate actor measures to elasticsearch} +\usage{ +aggregator_elastic(out, localhost = F, actorids, ver, cores, es_super) +} +\arguments{ +\item{out}{The output provided by elasticizer()} + +\item{localhost}{Boolean indicating if the script should run locally, or remote} + +\item{actorids}{List of actorids used in the search, should be the same as the actorids used for elasticizer()} + +\item{ver}{String indicating the version of the update} + +\item{cores}{Numeric value indicating the number of cores to use for processing} + +\item{es_super}{Write password for ES} +} +\value{ +Return value is based on output of elastic_update() +} +\description{ +Generate and store aggregate actor measures to elasticsearch +} +\examples{ +aggregator_elastic(out, localhost = F, actorids, ver, cores, es_super) +}