#' Generate a data frame out of unparsed Elasticsearch JSON #' #' Generate a data frame out of unparsed Elasticsearch JSON #' @param query A JSON-formatted query in the Elasticsearch query DSL #' @param src Logical (true/false) indicating whether or not the source of each document should be retrieved #' @param index The name of the Elasticsearch index to search through #' @param es_pwd The password for Elasticsearch read access #' @param batch_size Batch size #' @param max_batch Maximum number batches to retrieve #' @param time_scroll Time to keep the scroll instance open (defaults to 5m, with a maximum of 500 allowed instances, so a maximum of 100 per minute) #' @param update When set, indicates an update function to use on each batch of 1000 articles #' @param local Defaults to false. When true, connect to a local Elasticsearch instance on the default port (9200) #' @param ... Parameters passed on to the update function #' @return A data frame containing all the search results #' @export #' @examples #' elasticizer(query, src = TRUE, index = "maml", update = NULL, localhost = F) ################################################################################################# #################################### Get data from ElasticSearch ################################ ################################################################################################# elasticizer <- function(query, src = T, index = 'maml', es_pwd = .rs.askForPassword("Elasticsearch READ"), batch_size = 1024, max_batch = Inf, time_scroll = "5m", update = NULL, localhost = F, ...){ retries <- 10 ### Number of retries on error sleep <- 30 ### Number of seconds between retries httr::set_config(httr::config(http_version = 0)) ## Transitional code for syntax change in elastic package if (packageVersion("elastic") < 1) { if (localhost == F) { connect(es_port = 443, es_transport = 'https', es_host = 'linux01.uis.no', es_path = 'es', es_user = 'es', es_pwd = es_pwd, errors = 'complete') } if (localhost == T){ connect(es_port = 9200, es_transport = 'http', es_host = 'localhost', es_path = '', es_user = '', es_pwd = '', errors = 'complete') } conn <- NULL } else { if (localhost == F) { conn <- connect(port = 443, transport = 'https', host = 'linux01.uis.no', path = 'es', user = 'es', pwd = es_pwd, errors = 'complete') } if (localhost == T){ conn <- connect(port = 9200, transport = 'http', host = 'localhost', path = '', user = '', pwd = '', errors = 'complete') } } # Get all results - one approach is to use a while loop if (src == T) { res <- NULL attempt <- 0 while( is.null(res) && attempt <= retries ) { if (attempt > 0) { Sys.sleep(sleep) } attempt <- attempt + 1 try( res <- Search(conn = conn, index = index, time_scroll=time_scroll,body = query, size = batch_size, raw=T) ) if (attempt > 1) { print(paste0('Successful after ',attempt,' attempts')) } } } if (src == F) { res <- NULL attempt <- 0 while( is.null(res) && attempt <= retries ) { if (attempt > 0) { Sys.sleep(sleep) } attempt <- attempt + 1 try( res <- Search(conn = conn, index = index, time_scroll=time_scroll,body = query, size = batch_size, raw=T, source = F) ) if (attempt > 1) { print(paste0('Successful after ',attempt,' attempts')) } } } json <- fromJSON(res) if (json$hits$total$value == 0) { scroll_clear(conn = conn, x = json$`_scroll_id`) return(json) } else { out <- jsonlite:::flatten(json$hits$hits) total <- json$hits$total$value hits <- length(json$hits$hits) batch <- 1 print(paste0('Processing documents ',batch*batch_size-batch_size,' through ',batch*batch_size,' out of ',total,' documents.')) if (length(update) > 0){ update(out, localhost = localhost, ...) } while(hits > 0 && batch < max_batch ){ res <- NULL attempt <- 0 while( is.null(res) && attempt <= retries ) { if (attempt > 0) { Sys.sleep(sleep) } attempt <- attempt + 1 try( res <- scroll(conn = conn, json$`_scroll_id`, time_scroll=time_scroll, raw=T) ) if (attempt > 1) { print(paste0('Successful after ',attempt,' attempts')) } } json <- fromJSON(res) hits <- length(json$hits$hits) if(hits > 0) { batch <- batch+1 print(paste0('Processing documents ',batch*batch_size-batch_size,' through ',batch*batch_size,' out of ',total,' documents.')) if (length(update) > 0){ out <- jsonlite:::flatten(json$hits$hits) update(out, localhost = localhost, ...) } else { out <- bind_rows(out, jsonlite:::flatten(json$hits$hits)) } } } if (length(update) > 0) { scroll_clear(conn = conn, x = json$`_scroll_id`) return("Done updating") } else { scroll_clear(conn = conn, x = json$`_scroll_id`) return(out) } } }