Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
44 commits
Select commit Hold shift + click to select a range
cf52e62
Use "Dynamodb" as a variable name instead of "DynamoDB" for the namin…
civitaspo Feb 12, 2020
a5a6b62
Make current operations deprecated
civitaspo Feb 14, 2020
029add9
Define new operations configuration that supports all configuration f…
civitaspo Feb 16, 2020
57970b9
Show deprecation warnings and configure PluginTask
civitaspo Feb 16, 2020
e6e9583
Add "override" modifier to DynamodbInputPlugin & Throw UnsupportedOpe…
civitaspo Feb 17, 2020
599ed19
Introduce column_options and type_options to handle dynamodb attribut…
civitaspo Feb 18, 2020
464afde
Move deprecated plugin operations into DeprecatedDynamodbInputPlugin
civitaspo Feb 20, 2020
cdfa4d0
Avoid wildcard import
civitaspo Feb 20, 2020
8611746
mv test/run_dynamodb_local.sh .
civitaspo Feb 20, 2020
a69dae9
Make example runnable only in local
civitaspo Feb 20, 2020
298fb39
Make the old example config deprecated
civitaspo Feb 20, 2020
bedd970
Rename org.embulk.input.dynamodb.model.AttributeValue -> org.embulk.i…
civitaspo Feb 20, 2020
a8107c7
Find a bug in deprecated operation
civitaspo Feb 21, 2020
f510dfe
Add new operation request logic
civitaspo Feb 21, 2020
b3ee000
Renew DynamodbAttributeValue
civitaspo Feb 26, 2020
ce1a65c
Remove ColumnOptions and TypeOptions
civitaspo Feb 26, 2020
d327d88
Introduce DynamodbItemSchema and DynamodbItemColumn instead of Embulk…
civitaspo Feb 26, 2020
d18a0db
Introduce DynamodbAttributeValueType
civitaspo Feb 27, 2020
887bdad
Implement DynamodbAttributeValueConverter
civitaspo Mar 1, 2020
ca59407
Add DynamodbItemReader & DynamodbItemIterator
civitaspo Mar 1, 2020
7bf1297
Introduce DynamodbOperationProxy
civitaspo Mar 5, 2020
3c6821b
Delete AbstractDynamodbOperation, use DynamodbOperationCommonOptions …
civitaspo Mar 5, 2020
62ae856
Revert "Delete AbstractDynamodbOperation, use DynamodbOperationCommon…
civitaspo Mar 5, 2020
290a98b
Rearrange dynamodb.operation interfaces
civitaspo Mar 5, 2020
07e4e82
Implement Operations
civitaspo Mar 5, 2020
0440502
Implement the main logic with trivial fixes.
civitaspo Mar 5, 2020
28d8b30
Reduce dependencies between schema / attributeValue transforming
civitaspo Mar 5, 2020
09364b5
Store items as json if "columns" option is not defined.
civitaspo Mar 6, 2020
7ab257c
Rename operation tests
civitaspo Mar 6, 2020
bed2039
Implement String Converter correctly.
civitaspo Mar 6, 2020
fae86a3
Add more examples
civitaspo Mar 6, 2020
abfc4a6
Add compatibility tests
civitaspo Mar 6, 2020
df9ced6
Log operation requests for dynamodb.
civitaspo Mar 6, 2020
ff3c19c
Validate "batch_size"
civitaspo Mar 6, 2020
8fd9efc
Use java.lang.Long instead of scala Long because of embulk spec.
civitaspo Mar 6, 2020
1887720
Throw ConfigException when "columns" option is not set in DeprecatedD…
civitaspo Mar 6, 2020
5d3fef9
Validate invalid attribute values.
civitaspo Mar 8, 2020
7507442
Remove delegated methods with delegation macro annotation.
civitaspo Mar 8, 2020
6de0c4f
Validate the case that "operation" option is used with "query" or "sc…
civitaspo Mar 8, 2020
a80aa5f
Fix the deprecated operation: avoid NullPointerException when Attribu…
civitaspo Mar 8, 2020
7d00e45
Avoid OOM in Github Actions
civitaspo Mar 8, 2020
1b2dad2
Use `"record"` as the default value of **json_column_name** referring…
civitaspo Mar 8, 2020
1ba8e48
Remove duplicated options.
civitaspo Mar 8, 2020
4918fcf
Write README
civitaspo Mar 8, 2020
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
150 changes: 109 additions & 41 deletions README.md

Large diffs are not rendered by default.

13 changes: 13 additions & 0 deletions build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,8 @@ dependencies {

compile "com.amazonaws:aws-java-sdk-dynamodb:1.11.711"
compile "com.amazonaws:aws-java-sdk-sts:1.11.711"
// For @delegate macro.
compile "dev.zio:zio-macros-core_2.13:0.6.2"

testCompile "junit:junit:4.+"
testCompile "org.embulk:embulk-standards:0.9.23"
Expand All @@ -35,6 +37,17 @@ dependencies {
testCompile "org.embulk:embulk-test:0.9.23"
}

compileScala {
scalaCompileOptions.additionalParameters = [
"-Ymacro-annotations"
]
}

test {
jvmArgs '-Xms4g', '-Xmx4g', '-XX:MaxMetaspaceSize=1g'
maxHeapSize = "4g"
}

spotless {
scala {
scalafmt('2.3.2').configFile('.scalafmt.conf')
Expand Down
20 changes: 20 additions & 0 deletions example/config-deprecated.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
in:
type: dynamodb
region: us-east-1
endpoint: http://localhost:8000
operation: scan
table: embulk-input-dynamodb_example
auth_method: basic
access_key_id: dummy
secret_access_key: dummy
columns:
- {name: primary-key, type: string}
- {name: sort-key, type: long}
- {name: doubleValue, type: double}
- {name: boolValue, type: boolean}
- {name: listValue, type: json}
- {name: mapValue, type: json}

out:
type: stdout

18 changes: 18 additions & 0 deletions example/config-query-as-json.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
in:
type: dynamodb
region: us-east-1
endpoint: http://localhost:8000
query:
key_condition_expression: "#x = :v"
expression_attribute_names:
"#x": primary-key
expression_attribute_values:
":v": {S: key-1}
table: embulk-input-dynamodb_example
auth_method: basic
access_key_id: dummy
secret_access_key: dummy

out:
type: stdout

22 changes: 22 additions & 0 deletions example/config-query.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
in:
type: dynamodb
region: us-east-1
endpoint: http://localhost:8000
query:
key_condition_expression: "#x = :v"
expression_attribute_names:
"#x": primary-key
expression_attribute_values:
":v": {S: key-1}
table: embulk-input-dynamodb_example
auth_method: basic
access_key_id: dummy
secret_access_key: dummy
columns:
- {name: primary-key, type: string}
- {name: sort-key, type: long}
- {name: value, type: string}

out:
type: stdout

18 changes: 18 additions & 0 deletions example/config-scan.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
in:
type: dynamodb
region: us-east-1
endpoint: http://localhost:8000
scan:
total_segment: 20
table: embulk-input-dynamodb_example
auth_method: basic
access_key_id: dummy
secret_access_key: dummy
columns:
- {name: primary-key, type: string}
- {name: sort-key, type: long}
- {name: value, type: string}

out:
type: stdout

14 changes: 0 additions & 14 deletions example/config.yml

This file was deleted.

67 changes: 67 additions & 0 deletions example/prepare_dynamodb_table.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,67 @@
#!/usr/bin/env bash

if aws dynamodb describe-table --table-name='embulk-input-dynamodb_example' --endpoint-url http://localhost:8000 --region us-east-1 2>/dev/null; then
aws dynamodb delete-table --table-name='embulk-input-dynamodb_example' \
--endpoint-url http://localhost:8000 \
--region us-east-1
fi

aws dynamodb create-table \
--table-name='embulk-input-dynamodb_example' \
--attribute-definitions='[
{"AttributeName":"primary-key","AttributeType":"S"},
{"AttributeName":"sort-key","AttributeType":"N"}
]' \
--key-schema='[
{"AttributeName":"primary-key","KeyType":"HASH"},
{"AttributeName":"sort-key","KeyType":"RANGE"}
]' \
--provisioned-throughput='{"ReadCapacityUnits":5, "WriteCapacityUnits":5}' \
--endpoint-url http://localhost:8000 \
--region us-east-1

aws dynamodb put-item \
--table-name='embulk-input-dynamodb_example' \
--item='{
"primary-key" : { "S" : "key-1" },
"sort-key" : { "N" : "0" },
"doubleValue" : { "N" : "42.195" },
"boolValue" : { "BOOL" : true },
"listValue" : { "L":
[
{ "S" : "list-value"},
{ "N" : "123"}
]
},
"mapValue" : { "M":
{
"map-key-1" : { "S" : "map-value-1" },
"map-key-2" : { "N" : "456" }
}
}
}' \
--endpoint-url http://localhost:8000 \
--region us-east-1

aws dynamodb put-item \
--table-name='embulk-input-dynamodb_example' \
--item='{
"primary-key" : { "S" : "key-1" },
"sort-key" : { "N" : "1" },
"doubleValue" : { "NULL" : true },
"boolValue" : { "N" : "1" },
"listValue" : { "L":
[
{ "NULL" : true},
{ "N" : "123"}
]
},
"mapValue" : { "M":
{
"map-key-1" : { "S" : "map-value-1" },
"map-key-2" : { "N" : "456" }
}
}
}' \
--endpoint-url http://localhost:8000 \
--region us-east-1
File renamed without changes.
Original file line number Diff line number Diff line change
@@ -0,0 +1,73 @@
package org.embulk.input.dynamodb

import java.util.{List => JList}

import org.embulk.config.{
ConfigDiff,
ConfigException,
ConfigSource,
TaskReport,
TaskSource
}
import org.embulk.input.dynamodb.aws.Aws
import org.embulk.input.dynamodb.deprecated.ope.{QueryOperation, ScanOperation}
import org.embulk.spi.{Exec, InputPlugin, PageOutput, Schema}

@deprecated(since = "0.3.0")
object DeprecatedDynamodbInputPlugin extends InputPlugin {

override def transaction(
config: ConfigSource,
control: InputPlugin.Control
): ConfigDiff = {
val task: PluginTask = PluginTask.load(config)
val schema: Schema = task.getColumns.toSchema
if (schema.isEmpty)
throw new ConfigException("\"columns\" option must be set.")
val taskCount: Int = 1

control.run(task.dump(), schema, taskCount)
Exec.newConfigDiff()
}

override def resume(
taskSource: TaskSource,
schema: Schema,
taskCount: Int,
control: InputPlugin.Control
): ConfigDiff = {
throw new UnsupportedOperationException
}

override def run(
taskSource: TaskSource,
schema: Schema,
taskIndex: Int,
output: PageOutput
): TaskReport = {
val task: PluginTask = PluginTask.load(taskSource)

Aws(task).withDynamodb { dynamodb =>
task.getOperation.ifPresent { ope =>
val o = ope.toLowerCase match {
case "scan" => new ScanOperation(dynamodb)
case "query" => new QueryOperation(dynamodb)
}
o.execute(task, schema, output)
}
}

Exec.newTaskReport()
}

override def cleanup(
taskSource: TaskSource,
schema: Schema,
taskCount: Int,
successTaskReports: JList[TaskReport]
): Unit = {}

override def guess(config: ConfigSource): ConfigDiff = {
throw new UnsupportedOperationException
}
}
82 changes: 57 additions & 25 deletions src/main/scala/org/embulk/input/dynamodb/DynamodbInputPlugin.scala
Original file line number Diff line number Diff line change
Expand Up @@ -2,65 +2,97 @@ package org.embulk.input.dynamodb

import java.util.{List => JList}

import org.embulk.config._
import org.embulk.config.{ConfigDiff, ConfigSource, TaskReport, TaskSource}
import org.embulk.input.dynamodb.aws.Aws
import org.embulk.input.dynamodb.ope.{QueryOperation, ScanOperation}
import org.embulk.spi._
import org.embulk.input.dynamodb.item.DynamodbItemSchema
import org.embulk.input.dynamodb.operation.DynamodbOperationProxy
import org.embulk.spi.{Exec, InputPlugin, PageBuilder, PageOutput, Schema}

class DynamodbInputPlugin extends InputPlugin {

def transaction(
override def transaction(
config: ConfigSource,
control: InputPlugin.Control
): ConfigDiff = {
val task: PluginTask = config.loadConfig(classOf[PluginTask])
val task: PluginTask = PluginTask.load(config)
if (isDeprecatedOperationRequired(task))
return DeprecatedDynamodbInputPlugin.transaction(config, control)

val schema: Schema = task.getColumns.toSchema
val taskCount: Int = 1
val schema: Schema = DynamodbItemSchema(task).getEmbulkSchema
val taskCount: Int = DynamodbOperationProxy(task).getEmbulkTaskCount

resume(task.dump(), schema, taskCount, control)
control.run(task.dump(), schema, taskCount)
Exec.newConfigDiff()
}

def resume(
override def resume(
taskSource: TaskSource,
schema: Schema,
taskCount: Int,
control: InputPlugin.Control
): ConfigDiff = {
control.run(taskSource, schema, taskCount)
Exec.newConfigDiff()
val task: PluginTask = PluginTask.load(taskSource)
if (isDeprecatedOperationRequired(task))
return DeprecatedDynamodbInputPlugin.resume(
taskSource,
schema,
taskCount,
control
)
throw new UnsupportedOperationException
}

def run(
override def run(
taskSource: TaskSource,
schema: Schema,
taskIndex: Int,
output: PageOutput
): TaskReport = {
val task: PluginTask = taskSource.loadTask(classOf[PluginTask])
val task: PluginTask = PluginTask.load(taskSource)
if (isDeprecatedOperationRequired(task))
return DeprecatedDynamodbInputPlugin.run(
taskSource,
schema,
taskIndex,
output
)

Aws(task).withDynamoDB { dynamodb =>
val ope = task.getOperation.toLowerCase match {
case "scan" => new ScanOperation(dynamodb)
case "query" => new QueryOperation(dynamodb)
}
ope.execute(task, schema, output)
}
val pageBuilder = new PageBuilder(task.getBufferAllocator, schema, output)

Aws(task).withDynamodb { dynamodb =>
DynamodbOperationProxy(task).run(
dynamodb,
taskIndex,
DynamodbItemSchema(task).getItemsConsumer(pageBuilder)
)
}
pageBuilder.finish()
Exec.newTaskReport()
}

def cleanup(
override def cleanup(
taskSource: TaskSource,
schema: Schema,
taskCount: Int,
successTaskReports: JList[TaskReport]
): Unit = {
// TODO
val task: PluginTask = PluginTask.load(taskSource)
if (isDeprecatedOperationRequired(task))
DeprecatedDynamodbInputPlugin.cleanup(
taskSource,
schema,
taskCount,
successTaskReports
)
}

def guess(config: ConfigSource): ConfigDiff = {
// TODO
null
override def guess(config: ConfigSource): ConfigDiff = {
val task: PluginTask = PluginTask.load(config)
if (isDeprecatedOperationRequired(task))
return DeprecatedDynamodbInputPlugin.guess(config)
throw new UnsupportedOperationException
}

private def isDeprecatedOperationRequired(task: PluginTask): Boolean =
task.getOperation.isPresent
}
Loading