Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
39 changes: 19 additions & 20 deletions r-pkg/R/es_search.R
Original file line number Diff line number Diff line change
Expand Up @@ -222,7 +222,6 @@ es_search <- function(es_host
# \item \href{http://stackoverflow.com/questions/25453872/why-does-this-elasticsearch-scan-and-scroll-keep-returning-the-same-scroll-id}{More background on how/why Elasticsearch generates and changes the scroll_id}
# }
#' @importFrom data.table rbindlist setkeyv
#' @importFrom httr RETRY content
#' @importFrom jsonlite fromJSON
#' @importFrom parallel clusterMap makeForkCluster makePSOCKcluster stopCluster
#' @importFrom uuid UUIDgenerate
Expand Down Expand Up @@ -463,7 +462,7 @@ es_search <- function(es_host
# hits_to_pull - Total hits to be pulled (documents matching user's query).
# Or, in the case where max_hits < number of matching docs,
# max_hits.
#' @importFrom httr add_headers content RETRY stop_for_status
#' @importFrom httr add_headers
#' @importFrom jsonlite fromJSON
#' @importFrom uuid UUIDgenerate
.keep_on_pullin <- function(scroll_id
Expand Down Expand Up @@ -495,8 +494,8 @@ es_search <- function(es_host
, scroll = scroll
, scroll_id = scroll_id
)
httr::stop_for_status(result)
resultJSON <- httr::content(result, as = "text")
.stop_for_status(result)
resultJSON <- .content(result, as = "text")

# Parse to JSON to get total number of documents + new scroll_id
resultList <- jsonlite::fromJSON(resultJSON, simplifyVector = FALSE)
Expand Down Expand Up @@ -531,17 +530,17 @@ es_search <- function(es_host
# [name] .new_scroll_request
# [description] Make a scrolling request and return the result
# [references] https://www.elastic.co/guide/en/elasticsearch/reference/6.x/search-request-scroll.html
#' @importFrom httr add_headers RETRY
#' @importFrom httr add_headers
.new_scroll_request <- function(es_host, scroll, scroll_id) {

# Set up scroll_url
scroll_url <- paste0(es_host, "/_search/scroll") # nolint[absolute_path,non_portable_path]

# Get the next page
result <- httr::RETRY(
result <- .request(
verb = "POST"
, httr::add_headers(c("Content-Type" = "application/json")) # nolint[non_portable_path]
, url = scroll_url
, config = httr::add_headers(c("Content-Type" = "application/json")) # nolint[non_portable_path]
, body = sprintf('{"scroll": "%s", "scroll_id": "%s"}', scroll, scroll_id)
)
return(result)
Expand All @@ -550,17 +549,17 @@ es_search <- function(es_host
# [title] Make a scroll request with the strategy supported by Elasticsearch 1.x and Elasticsearch 2.x
# [name] .legacy_scroll_request
# [description] Make a scrolling request and return the result
#' @importFrom httr add_headers RETRY
#' @importFrom httr add_headers
.legacy_scroll_request <- function(es_host, scroll, scroll_id) {

# Set up scroll_url
scroll_url <- paste0(es_host, "/_search/scroll?scroll=", scroll)

# Get the next page
result <- httr::RETRY(
result <- .request(
verb = "POST"
, httr::add_headers(c("Content-Type" = "application/json")) # nolint[non_portable_path]
, url = scroll_url
, config = httr::add_headers(c("Content-Type" = "application/json")) # nolint[non_portable_path]
, body = scroll_id
)
return(result)
Expand Down Expand Up @@ -629,20 +628,20 @@ es_search <- function(es_host
# version of Elasticsearch.
# [param] es_host A string identifying an Elasticsearch host. This should be of the form
# [transfer_protocol][hostname]:[port]. For example, 'http://myindex.thing.com:9200'.
#' @importFrom httr content RETRY stop_for_status
.get_es_version <- function(es_host) {

# Hit the cluster root to get metadata
log_info("Checking Elasticsearch version...")
result <- httr::RETRY(
result <- .request(
verb = "GET"
, httr::add_headers(c("Content-Type" = "application/json")) # nolint[non_portable_path]
, url = es_host
, config = httr::add_headers(c("Content-Type" = "application/json")) # nolint[non_portable_path]
, body = NULL
)
httr::stop_for_status(result)
.stop_for_status(result)

# Extract version number from the result
version <- httr::content(result, as = "parsed")[["version"]][["number"]]
version <- .content(result, as = "parsed")[["version"]][["number"]]
log_info(sprintf("uptasticsearch thinks you are running Elasticsearch %s", version))

# Parse out just the major version. We can adjust this if we find
Expand Down Expand Up @@ -702,7 +701,7 @@ es_search <- function(es_host
# write(result, 'results.json')
#
# }
#' @importFrom httr add_headers content RETRY stop_for_status
#' @importFrom httr add_headers
.search_request <- function(es_host
, es_index
, trailing_args = NULL
Expand All @@ -719,14 +718,14 @@ es_search <- function(es_host
}

# Make request
result <- httr::RETRY(
result <- .request(
verb = "POST"
, httr::add_headers(c("Content-Type" = "application/json")) # nolint[non_portable_path]
, url = reqURL
, config = httr::add_headers(c("Content-Type" = "application/json")) # nolint[non_portable_path]
, body = query_body
)
httr::stop_for_status(result)
result <- httr::content(result, as = "text")
.stop_for_status(result)
result <- .content(result, as = "text")

return(result)
}
Expand Down
29 changes: 16 additions & 13 deletions r-pkg/R/get_fields.R
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
#' @description For a given Elasticsearch index, return the mapping from field name
#' to data type for all indexed fields.
#' @importFrom data.table := as.data.table rbindlist uniqueN
#' @importFrom httr add_headers content RETRY stop_for_status
#' @importFrom httr add_headers
#' @importFrom jsonlite fromJSON
#' @importFrom purrr map2
#' @param es_indices A character vector that contains the names of indices for
Expand Down Expand Up @@ -56,14 +56,15 @@ get_fields <- function(es_host
)
, major_version
))
res <- httr::RETRY(
res <- .request(
verb = "GET"
, url = sprintf("%s/_cat/indices?format=json", es_url)
, times = 3
, config = list()
, body = NULL
)
indexDT <- data.table::as.data.table(
jsonlite::fromJSON(
httr::content(res, "text")
.content(res, as = "text")
, simplifyDataFrame = TRUE
)
)
Expand All @@ -79,13 +80,14 @@ get_fields <- function(es_host
########################## make the query ################################
log_info(paste("Getting indexed fields for indices:", indices))

result <- httr::RETRY(
result <- .request(
verb = "GET"
, url = es_url
, httr::add_headers(c("Content-Type" = "application/json")) # nolint[non_portable_path]
, config = httr::add_headers(c("Content-Type" = "application/json")) # nolint[non_portable_path]
, body = NULL
)
httr::stop_for_status(result)
resultContent <- httr::content(result, as = "parsed")
.stop_for_status(result)
resultContent <- .content(result, as = "parsed")

######################### flatten the result ##############################
if (as.integer(major_version) > 6) {
Expand Down Expand Up @@ -190,20 +192,21 @@ get_fields <- function(es_host

# [title] Get a data.table containing names of indices and aliases
# [es_host] A string identifying an Elasticsearch host.
#' @importFrom httr add_headers content RETRY stop_for_status
#' @importFrom httr add_headers
.get_aliases <- function(es_host) {

# construct the url to the alias endpoint
url <- paste0(es_host, "/_cat/aliases") # nolint[absolute_path, non_portable_path]

# make the request
result <- httr::RETRY(
result <- .request(
verb = "GET"
, url = url
, httr::add_headers(c("Content-Type" = "application/json")) # nolint[non_portable_path]
, config = httr::add_headers(c("Content-Type" = "application/json")) # nolint[non_portable_path]
, body = NULL
)
httr::stop_for_status(result)
resultContent <- httr::content(result, as = "text")
.stop_for_status(result)
resultContent <- .content(result, as = "text")

# NOTES:
# - with Elasticsearch 1.7.2., this returns an empty array "[]"
Expand Down
34 changes: 34 additions & 0 deletions r-pkg/R/helperfuns.R
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
# [title] Extract the content of an HTTP response into a different format
# [name] .content
# [description] Mainly here to making mocking easier in testing.
# [references] https://testthat.r-lib.org/reference/local_mocked_bindings.html#namespaced-calls
#' @importFrom httr content
.content <- function(response, as) {
return(httr::content(response, as = as))
}

# [title] Execute an HTTP request and return the result
# [name] .request
# [description] Mainly here to making mocking easier in testing, but this
# also centralizes the mechanism for HTTP request exexcution in one place.
# [references] https://testthat.r-lib.org/reference/local_mocked_bindings.html#namespaced-calls
#' @importFrom httr RETRY
.request <- function(verb, url, config, body) {
result <- httr::RETRY(
verb = verb
, url = url
, config = config
, body = body
)
return(result)
}

# [title] Raise an exception if an HTTP response indicates an error
# [name] .stop_for_status
# [description] Mainly here to making mocking easier in testing.
# [references] https://testthat.r-lib.org/reference/local_mocked_bindings.html#namespaced-calls
#' @importFrom httr stop_for_status
.stop_for_status <- function(response) {
httr::stop_for_status(response)
return(invisible(NULL))
}
72 changes: 43 additions & 29 deletions r-pkg/tests/testthat/test-get_fields.R
Original file line number Diff line number Diff line change
Expand Up @@ -20,35 +20,49 @@ futile.logger::flog.threshold(0)
regexp = "get_fields must be passed a valid es_indices")
})

# works as expected when mocked
test_that("get_fields works as expected when mocked", {
test_json <- system.file("testdata", "two_index_mapping.json", package = "uptasticsearch")
aliasDT <- data.table::data.table(alias = c("alias1", "alias2")
, index = c("company", "otherIndex"))
# nolint start
testthat::with_mock(
`httr::stop_for_status` = function(...) {return(NULL)},
`httr::RETRY` = function(...) {return(NULL)},
`httr::content` = function(...) {return(jsonlite::fromJSON(txt = test_json))},
`uptasticsearch::.get_aliases` = function(...) {return(aliasDT)},
`uptasticsearch::.get_es_version` = function(...) {return("6")},
{
outDT <- get_fields(es_host = "http://db.mycompany.com:9200"
, es_indices = c("company", "hotel"))
data.table::setkey(outDT, NULL)
expected <- data.table::data.table(
index = c(rep("alias1", 3), rep("hotel", 5))
, type = c(rep("building", 3), rep("bed_room", 2), rep("conference_room", 3))
, field = c("id", "address", "address.keyword", "num_beds", "description"
, "num_people", "purpose", "purpose.keyword")
, data_type = c("long", "text", "keyword", "integer", "text", "integer"
, "text", "keyword")
)
expect_identical(outDT, expected)
}
)
# nolint end
})
# works as expected when mocked
test_that("get_fields works as expected when mocked", {

test_json <- system.file("testdata", "two_index_mapping.json", package = "uptasticsearch")
aliasDT <- data.table::data.table(
alias = c("alias1", "alias2")
, index = c("company", "otherIndex")
)
testthat::with_mocked_bindings(
`.content` = function(...) {
return(jsonlite::fromJSON(txt = test_json))
},
`.get_aliases` = function(...) {
return(aliasDT)
},
`.get_es_version` = function(...) {
return("6")
}
,
`.request` = function(...) {
return(NULL)
},
`.stop_for_status` = function(...) {
return(NULL)
},
{
outDT <- get_fields(
es_host = "http://db.mycompany.com:9200"
, es_indices = c("company", "hotel")
)
data.table::setkey(outDT, NULL)
expected <- data.table::data.table(
index = c(rep("alias1", 3), rep("hotel", 5))
, type = c(rep("building", 3), rep("bed_room", 2), rep("conference_room", 3))
, field = c("id", "address", "address.keyword", "num_beds", "description"
, "num_people", "purpose", "purpose.keyword")
, data_type = c("long", "text", "keyword", "integer", "text", "integer"
, "text", "keyword")
)
expect_identical(outDT, expected)
}
)
})

#--- .flatten_mapping

Expand Down