### Notes:
# Do you want to search for either one OR other actorid, or both occuring in the same document?
# Do you want to keep only the occurences of the actorids you are searching for, or all actor occurences in the hits?
# Search by actorId, then aggregate by month
# When actorId starts with P_, define what hits you want to get (short, full, actor), if more than one, aggregate properly
# Develop query generator for specific actors (ie combine actorId with start and end dates)
#' Generate aggregated actor measures from raw data
#'
#' Generate aggregated actor measures from raw data
#' @param row The row of the actors data frame used for aggregation
#' @param actors The data frame containing actor data
#' @param es_pwd The password for read access to ES
#' @param localhost Boolean indicating if the script is running locally or not
#' @param default_operator String indicating whether actor aggregations should be made by searching for the presence of any of the actor ids (OR), or all of them (AND). Defaults to OR
#' @return No return value, data per actor is saved in an RDS file
#' @export
#' @examples
#' actor_aggregation(row, actors, es_pwd, localhost, default_operator = 'OR')
#################################################################################################
#################################### Aggregate actor results ################################
#################################################################################################
actor_aggregation <- function ( row , actors , es_pwd , localhost , default_operator = ' OR' , sent_dict = NULL , cores = detectCores ( ) ) {
### Functions
aggregator <- function ( id , duplicates ) {
df <- duplicates %>%
filter ( `_id` == id ) %>%
group_by ( `_id` ) %>%
summarise (
`_source.doctype` = first ( `_source.doctype` ) ,
`_source.publication_date` = first ( `_source.publication_date` ) ,
# actor_end = list(sort(unique(unlist(actor_end)))),
prom = list ( length ( unique ( unlist ( sentence_id ) ) ) / round ( occ [ [1 ] ] / prom [ [1 ] ] ) ) ,
sentence_id = list ( sort ( unique ( unlist ( sentence_id ) ) ) ) ,
rel_first = list ( max ( unlist ( rel_first ) ) ) ,
# sentence_end = list(sort(unique(unlist(sentence_end)))),
# actor_start = list(sort(unique(unlist(actor_start)))),
ids = list ( unique ( unlist ( ids ) ) ) ,
# sentence_start = list(sort(unique(unlist(sentence_start)))),
occ = list ( length ( unique ( unlist ( sentence_id ) ) ) ) ,
first = list ( min ( unlist ( sentence_id ) ) )
)
return ( df )
}
### Calculate sentiment scores for each actor-document
par_sent <- function ( row , out , sent_dict ) {
out_row <- out [row , ]
### Contains sentiment per sentence for whole article
sentiment_ud <- out_row $ `_source.ud` [ [1 ] ] %>%
select ( - one_of ( ' exists' ) ) %>%
unnest ( ) %>%
filter ( upos != ' PUNCT' ) %>% # For getting proper word counts
mutate ( V1 = str_c ( lemma , ' _' , upos ) ) %>%
left_join ( sent_dict , by = ' V1' ) %>%
### Setting binary sentiment as unit of analysis
mutate ( V2 = V3 ) %>%
group_by ( sentence_id ) %>%
mutate (
V2 = case_when (
is.na ( V2 ) == T ~ 0 ,
TRUE ~ V2
)
) %>%
summarise ( sent_sum = sum ( V2 ) ,
words = length ( lemma ) ,
sent_words = length ( na.omit ( V3 ) ) ) %>%
mutate (
sent = sent_sum / words ,
arousal = sent_words / words
)
out_row <- select ( out_row , - `_source.ud` )
### Contains sentiment per sentence for actor
actor_tone <- filter ( sentiment_ud , sentence_id %in% unlist ( out_row $ sentence_id ) )
### Aggregated sentiment per actor (over all sentences containing actor)
actor <- summarise ( actor_tone ,
sent = sum ( sent_sum ) / sum ( words ) ,
sent_sum = sum ( sent_sum ) ,
sent_words = sum ( sent_words ) ,
words = sum ( words ) ,
arousal = sum ( sent_words ) / sum ( words )
)
### Aggregated sentiment per article (over all sentences in article)
text <- summarise ( sentiment_ud ,
sent = sum ( sent_sum ) / sum ( words ) ,
sent_sum = sum ( sent_sum ) ,
sent_words = sum ( sent_words ) ,
words = sum ( words ) ,
arousal = sum ( sent_words ) / sum ( words )
)
return ( cbind ( out_row , data.frame ( actor = actor , text = text ) ) )
}
### Creating aggregate measuers at daily, weekly, monthly and yearly level
grouper <- function ( level , out , actorids , sent = F ) {
by_newspaper <- out %>%
mutate (
sentence_count = round ( unlist ( occ ) / unlist ( prom ) )
) %>%
group_by_at ( vars ( level , `_source.doctype` ) ) %>%
summarise (
occ = sum ( unlist ( occ ) ) ,
prom_art = mean ( unlist ( prom ) ) ,
rel_first_art = mean ( unlist ( rel_first ) ) ,
first = mean ( unlist ( first ) ) ,
sentence_count = sum ( sentence_count ) ,
articles = length ( `_id` ) ,
level = level
) %>%
ungroup ( )
aggregate <- out %>%
mutate (
sentence_count = round ( unlist ( occ ) / unlist ( prom ) )
) %>%
group_by_at ( vars ( level ) ) %>%
summarise (
occ = sum ( unlist ( occ ) ) ,
prom_art = mean ( unlist ( prom ) ) ,
rel_first_art = mean ( unlist ( rel_first ) ) ,
first = mean ( unlist ( first ) ) ,
sentence_count = sum ( sentence_count ) ,
articles = length ( `_id` ) ,
`_source.doctype` = ' agg' ,
level = level
) %>%
ungroup ( )
if ( sent == T ) {
by_newspaper_sent <- out %>%
group_by_at ( vars ( level , `_source.doctype` ) ) %>%
summarise (
actor.sent = mean ( actor.sent ) ,
actor.sent_sum = sum ( actor.sent_sum ) ,
actor.sent_words = sum ( actor.sent_words ) ,
actor.words = sum ( actor.words ) ,
actor.arousal = mean ( actor.arousal ) ,
text.sent = mean ( text.sent ) ,
text.sent_sum = sum ( text.sent_sum ) ,
text.sent_words = sum ( text.sent_words ) ,
text.words = sum ( text.words ) ,
text.arousal = mean ( text.arousal )
) %>%
ungroup ( ) %>%
select ( - level , - `_source.doctype` )
aggregate_sent <- out %>%
group_by_at ( vars ( level ) ) %>%
summarise (
actor.sent = mean ( actor.sent ) ,
actor.sent_sum = sum ( actor.sent_sum ) ,
actor.sent_words = sum ( actor.sent_words ) ,
actor.words = sum ( actor.words ) ,
actor.arousal = mean ( actor.arousal ) ,
text.sent = mean ( text.sent ) ,
text.sent_sum = sum ( text.sent_sum ) ,
text.sent_words = sum ( text.sent_words ) ,
text.words = sum ( text.words ) ,
text.arousal = mean ( text.arousal )
) %>%
ungroup ( ) %>%
select ( - level )
aggregate <- bind_cols ( aggregate , aggregate_sent )
by_newspaper <- bind_cols ( by_newspaper , by_newspaper_sent )
}
output <- bind_rows ( by_newspaper , aggregate ) %>%
bind_cols ( .,bind_rows ( actor ) [rep ( seq_len ( nrow ( bind_rows ( actor ) ) ) , each = nrow ( .) ) , ] ) %>%
select (
- `_index` ,
- `_type` ,
- `_score` ,
- `_id` ,
- contains ( ' Search' ) ,
- contains ( ' not' )
)
colnames ( output ) <- gsub ( " _source." , ' ' , colnames ( output ) )
return ( output )
}
###########################################################################################
plan ( multiprocess , workers = cores )
if ( is.null ( sent_dict ) == F ) {
fields <- c ( ' ud' , ' computerCodes.actorsDetail' , ' doctype' , ' publication_date' )
} else {
fields <- c ( ' computerCodes.actorsDetail' , ' doctype' , ' publication_date' )
}
actor <- actors [row , ]
if ( actor $ `_source.function` == " Party" ) {
years = seq ( 2000 , 2019 , 1 )
startDate <- ' 2000'
endDate <- ' 2019'
} else {
years = c ( 0 )
startDate <- actor $ `_source.startDate`
endDate <- actor $ `_source.endDate`
}
if ( actor $ `_source.function` == ' Party' && actor $ party_only == T ) {
actorids <- c ( paste0 ( actor $ `_source.partyId` , ' _s' ) , paste0 ( actor $ `_source.partyId` , ' _f' ) )
} else if ( actor $ `_source.function` == ' Party' ) {
actorids <- c ( paste0 ( actor $ `_source.partyId` , ' _s' ) , paste0 ( actor $ `_source.partyId` , ' _f' ) , paste0 ( actor $ `_source.partyId` , ' _a' ) )
actor $ party_only <- F
} else {
actorids <- actor $ `_source.actorId`
actor $ party_only <- NULL
}
actor_aggregator <- function ( year , query , actor , actorids , default_operator , localhost = F , es_pwd ) {
if ( year > 0 ) {
query <- paste0 ( ' computerCodes.actors:(' , paste ( actorids , collapse = ' ' ) , ' ) && publication_date:[' , year , ' -01-01 TO ' , year , ' -12-31]' )
} else {
query <- paste0 ( ' computerCodes.actors:(' , paste ( actorids , collapse = ' ' ) , ' ) && publication_date:[' , actor $ `_source.startDate` , ' TO ' , actor $ `_source.endDate` , ' ]' )
}
### Temporary exception for UK missing junk coding
if ( actor $ `_source.country` != ' uk' ) {
query <- paste0 ( query , ' && computerCodes.junk:0' )
}
out <- elasticizer ( query_string ( paste0 ( ' country:' , actor $ `_source.country` , ' && ' , query ) ,
fields = fields , default_operator = default_operator ) ,
localhost = localhost ,
es_pwd = es_pwd )
if ( length ( out $ `_id` ) > 0 ) {
if ( is.null ( sent_dict ) == F ) {
out_ud <- out %>% select ( `_id` , `_source.ud` )
out <- out %>% select ( - `_source.ud` )
}
### Generating actor dataframe, unnest by actorsDetail, then by actor ids. Filter out non-relevant actor ids.
out <- out %>%
unnest ( `_source.computerCodes.actorsDetail` , .preserve = colnames ( .) ) %>%
unnest ( ids , .preserve = colnames ( .) ) %>%
filter ( ids1 %in% actorids ) # %>%
# select(-ends_with('start')) %>%
# select(-ends_with('end')) %>%
# select(-starts_with('ids'))
### Only if there are more rows than articles, recalculate
if ( length ( unique ( out $ `_id` ) ) != length ( out $ `_id` ) ) {
duplicates <- out [ ( duplicated ( out $ `_id` ) | duplicated ( out $ `_id` , fromLast = T ) ) , ]
actor_single <- out [ ! ( duplicated ( out $ `_id` ) | duplicated ( out $ `_id` , fromLast = T ) ) , ]
art_id <- unique ( duplicates $ `_id` )
dupe_merged <- bind_rows ( future_lapply ( art_id , aggregator , duplicates = duplicates ) )
out <- bind_rows ( dupe_merged , actor_single )
}
if ( is.null ( sent_dict ) == F ) {
out <- left_join ( out , out_ud , by = ' _id' )
out <- bind_rows ( future_lapply ( seq ( 1 , nrow ( out ) , 1 ) , par_sent , out = out , sent_dict = sent_dict ) )
}
### Creating date grouping variables
out <- out %>%
mutate (
year = strftime ( `_source.publication_date` , format = ' %Y' ) ,
yearmonth = strftime ( out $ `_source.publication_date` , format = ' %Y%m' ) ,
yearmonthday = strftime ( out $ `_source.publication_date` , format = ' %Y%m%d' ) ,
yearweek = strftime ( out $ `_source.publication_date` , format = " %Y%V" )
) %>%
select (
- `_score` ,
- `_index` ,
- `_type` ,
- `_score` ,
- `_source.computerCodes.actorsDetail` ,
- ids1
)
levels <- c ( ' year' , ' yearmonth' , ' yearmonthday' , ' yearweek' )
aggregate_data <- bind_rows ( lapply ( levels , grouper , out = out , actorids = actorids , sent = ! is.null ( sent_dict ) ) )
return ( aggregate_data )
} else {
return ( )
}
}
saveRDS ( bind_rows ( lapply ( years , actor_aggregator , query , actor , actorids , default_operator , localhost , es_pwd ) ) , file = paste0 ( actor $ `_source.country` , ' _' , paste0 ( actorids , collapse = ' ' ) , actor $ `_source.function` , startDate , endDate , ' .Rds' ) )
print ( paste0 ( ' Done with ' , row , ' /' , nrow ( actors ) , ' actors' ) )
return ( )
}