actor_aggregation: updated to use future package for parallel processing as beta test for switching all parallel processing to future. Also disabled some of the aggregator output to save computation time

master
Erik de Vries 6 years ago
parent f8bc53006d
commit edd4b785a5

@ -13,8 +13,9 @@ Depends: R (>= 3.3.1),
caret, caret,
e1071, e1071,
udpipe, udpipe,
parallel, SparseM,
SparseM future,
future.apply
License: Copyright Erik de Vries License: Copyright Erik de Vries
Encoding: UTF-8 Encoding: UTF-8
LazyData: true LazyData: true

@ -31,14 +31,14 @@ actor_aggregation <- function(row, actors, es_pwd, localhost, default_operator =
summarise( summarise(
`_source.doctype` = first(`_source.doctype`), `_source.doctype` = first(`_source.doctype`),
`_source.publication_date` = first(`_source.publication_date`), `_source.publication_date` = first(`_source.publication_date`),
actor_end = list(sort(unique(unlist(actor_end)))), # actor_end = list(sort(unique(unlist(actor_end)))),
prom = list(length(unique(unlist(sentence_id)))/round(occ[[1]]/prom[[1]])), prom = list(length(unique(unlist(sentence_id)))/round(occ[[1]]/prom[[1]])),
sentence_id = list(sort(unique(unlist(sentence_id)))), sentence_id = list(sort(unique(unlist(sentence_id)))),
rel_first = list(max(unlist(rel_first))), rel_first = list(max(unlist(rel_first))),
sentence_end = list(sort(unique(unlist(sentence_end)))), # sentence_end = list(sort(unique(unlist(sentence_end)))),
actor_start = list(sort(unique(unlist(actor_start)))), # actor_start = list(sort(unique(unlist(actor_start)))),
ids = list(sort(unique(unlist(ids)))), ids = list(unique(unlist(ids))),
sentence_start = list(sort(unique(unlist(sentence_start)))), # sentence_start = list(sort(unique(unlist(sentence_start)))),
occ = list(length(unique(unlist(sentence_id)))), occ = list(length(unique(unlist(sentence_id)))),
first = list(min(unlist(sentence_id))) first = list(min(unlist(sentence_id)))
) )
@ -179,6 +179,7 @@ actor_aggregation <- function(row, actors, es_pwd, localhost, default_operator =
return(output) return(output)
} }
########################################################################################### ###########################################################################################
plan(multiprocess, workers = cores)
if (is.null(sent_dict) == F) { if (is.null(sent_dict) == F) {
fields <- c('ud','computerCodes.actorsDetail', 'doctype', 'publication_date') fields <- c('ud','computerCodes.actorsDetail', 'doctype', 'publication_date')
} else { } else {
@ -239,13 +240,13 @@ actor_aggregation <- function(row, actors, es_pwd, localhost, default_operator =
duplicates <- out[(duplicated(out$`_id`) | duplicated(out$`_id`, fromLast = T)),] duplicates <- out[(duplicated(out$`_id`) | duplicated(out$`_id`, fromLast = T)),]
actor_single <- out[!(duplicated(out$`_id`) | duplicated(out$`_id`, fromLast = T)),] actor_single <- out[!(duplicated(out$`_id`) | duplicated(out$`_id`, fromLast = T)),]
art_id <- unique(duplicates$`_id`) art_id <- unique(duplicates$`_id`)
dupe_merged <- bind_rows(mclapply(art_id, aggregator, duplicates = duplicates, mc.cores = cores)) dupe_merged <- bind_rows(future_lapply(art_id, aggregator, duplicates = duplicates))
out <- bind_rows(dupe_merged, actor_single) out <- bind_rows(dupe_merged, actor_single)
} }
if (is.null(sent_dict) == F) { if (is.null(sent_dict) == F) {
out <- left_join(out, out_ud, by = '_id') out <- left_join(out, out_ud, by = '_id')
out <- bind_rows(mclapply(seq(1,nrow(out),1),par_sent, out = out, sent_dict = sent_dict, mc.cores = cores)) out <- bind_rows(future_lapply(seq(1,nrow(out),1),par_sent, out = out, sent_dict = sent_dict))
} }
### Creating date grouping variables ### Creating date grouping variables

Loading…
Cancel
Save