You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
mamlr/R/elasticizer.R

153 lines
5.5 KiB

#' Generate a data frame out of unparsed Elasticsearch JSON
#'
#' Generate a data frame out of unparsed Elasticsearch JSON
#' @param query A JSON-formatted query in the Elasticsearch query DSL
#' @param src Logical (true/false) indicating whether or not the source of each document should be retrieved
#' @param index The name of the Elasticsearch index to search through
#' @param es_pwd The password for Elasticsearch read access
#' @param batch_size Batch size
#' @param max_batch Maximum number batches to retrieve
#' @param time_scroll Time to keep the scroll instance open (defaults to 5m, with a maximum of 500 allowed instances, so a maximum of 100 per minute)
#' @param dump Boolean indicating whether the data frames should be returned, or dumped as .Rds files
#' @param update When set, indicates an update function to use on each batch of 1000 articles
#' @param local Defaults to false. When true, connect to a local Elasticsearch instance on the default port (9200)
#' @param ... Parameters passed on to the update function
#' @return A data frame containing all the search results
#' @export
#' @examples
#' elasticizer(query, src = TRUE, index = "maml", update = NULL, localhost = F)
#################################################################################################
#################################### Get data from ElasticSearch ################################
#################################################################################################
elasticizer <- function(query, src = T, index = 'maml', es_pwd = .rs.askForPassword("Elasticsearch READ"), batch_size = 1024, max_batch = Inf, time_scroll = "5m", dump = F, update = NULL, localhost = F, ...){
retries <- 10 ### Number of retries on error
sleep <- 30 ### Number of seconds between retries
httr::set_config(httr::config(http_version = 0))
## Transitional code for syntax change in elastic package
if (packageVersion("elastic") < 1) {
if (localhost == F) {
connect(es_port = 443,
es_transport = 'https',
es_host = 'linux01.uis.no',
es_path = 'es',
es_user = 'es',
es_pwd = es_pwd,
errors = 'complete')
}
if (localhost == T){
connect(es_port = 9200,
es_transport = 'http',
es_host = 'localhost',
es_path = '',
es_user = '',
es_pwd = '',
errors = 'complete')
}
conn <- NULL
} else {
if (localhost == F) {
conn <- connect(port = 443,
transport = 'https',
host = 'linux01.uis.no',
path = 'es',
user = 'es',
pwd = es_pwd,
errors = 'complete')
}
if (localhost == T){
conn <- connect(port = 9200,
transport = 'http',
host = 'localhost',
path = '',
user = '',
pwd = '',
errors = 'complete')
}
}
# Get all results - one approach is to use a while loop
if (src == T) {
res <- NULL
attempt <- 0
while( is.null(res) && attempt <= retries ) {
if (attempt > 0) {
Sys.sleep(sleep)
}
attempt <- attempt + 1
try(
res <- Search(conn = conn, index = index, time_scroll=time_scroll,body = query, size = batch_size, raw=T)
)
if (attempt > 1) {
print(paste0('Successful after ',attempt,' attempts'))
}
}
}
if (src == F) {
res <- NULL
attempt <- 0
while( is.null(res) && attempt <= retries ) {
if (attempt > 0) {
Sys.sleep(sleep)
}
attempt <- attempt + 1
try(
res <- Search(conn = conn, index = index, time_scroll=time_scroll,body = query, size = batch_size, raw=T, source = F)
)
if (attempt > 1) {
print(paste0('Successful after ',attempt,' attempts'))
}
}
}
json <- fromJSON(res)
if (json$hits$total$value == 0) {
scroll_clear(conn = conn, x = json$`_scroll_id`)
return(json)
} else {
out <- jsonlite:::flatten(json$hits$hits)
total <- json$hits$total$value
hits <- length(json$hits$hits)
batch <- 1
print(paste0('Processing documents ',batch*batch_size-batch_size,' through ',batch*batch_size,' out of ',total,' documents.'))
if (length(update) > 0){
update(out, localhost = localhost, ...)
}
while(hits > 0 && batch < max_batch ){
res <- NULL
attempt <- 0
while( is.null(res) && attempt <= retries ) {
if (attempt > 0) {
Sys.sleep(sleep)
}
attempt <- attempt + 1
try(
res <- scroll(conn = conn, json$`_scroll_id`, time_scroll=time_scroll, raw=T)
)
if (attempt > 1) {
print(paste0('Successful after ',attempt,' attempts'))
}
}
json <- fromJSON(res)
hits <- length(json$hits$hits)
if(hits > 0) {
batch <- batch+1
print(paste0('Processing documents ',batch*batch_size-batch_size,' through ',batch*batch_size,' out of ',total,' documents.'))
if (length(update) > 0){
out <- jsonlite:::flatten(json$hits$hits)
update(out, localhost = localhost, ...)
} else {
out <- bind_rows(out, jsonlite:::flatten(json$hits$hits))
}
}
}
if (length(update) > 0) {
scroll_clear(conn = conn, x = json$`_scroll_id`)
return("Done updating")
} else if (dump) {
saveRDS(out, file = paste0('batch_',batch*batch_size,'.Rds'))
} else {
scroll_clear(conn = conn, x = json$`_scroll_id`)
return(out)
}
}
}