midgard

所属分类:云计算
开发工具:kotlin
文件大小:0KB
下载次数:0
上传日期:2023-06-02 09:41:00
上 传 者sh-1993
说明:  Midgard是Beam Kotlin的包装器,允许更简洁和更具表现力的代码。它删除了Beam样板代码,并提出了更多的函数式编程风格
(Midgard is a wrapper on Beam Kotlin, allowing more concise and expressive code. It removes Beam boilerplate code and proposes more Functional Programming style)

文件列表:
LICENSE (1064, 2023-11-27)
beam-kotlin.png (43869, 2023-11-27)
pom.xml (12382, 2023-11-27)
src/ (0, 2023-11-27)
src/main/ (0, 2023-11-27)
src/main/kotlin/ (0, 2023-11-27)
src/main/kotlin/fr/ (0, 2023-11-27)
src/main/kotlin/fr/groupbees/ (0, 2023-11-27)
src/main/kotlin/fr/groupbees/midgard/ (0, 2023-11-27)
src/main/kotlin/fr/groupbees/midgard/PCollectionExtensions.kt (11551, 2023-11-27)
src/main/kotlin/fr/groupbees/midgard/transforms/ (0, 2023-11-27)
src/main/kotlin/fr/groupbees/midgard/transforms/BaseElementFn.kt (1961, 2023-11-27)
src/main/kotlin/fr/groupbees/midgard/transforms/FlatMapElementFn.kt (8606, 2023-11-27)
src/main/kotlin/fr/groupbees/midgard/transforms/FlatMapProcessContextFn.kt (10515, 2023-11-27)
src/main/kotlin/fr/groupbees/midgard/transforms/MapElementFn.kt (8726, 2023-11-27)
src/main/kotlin/fr/groupbees/midgard/transforms/MapProcessContextFn.kt (10152, 2023-11-27)
src/main/kotlin/fr/groupbees/midgard/transforms/SerializableAction.kt (406, 2023-11-27)
src/test/ (0, 2023-11-27)
src/test/kotlin/ (0, 2023-11-27)
src/test/kotlin/fr/ (0, 2023-11-27)
src/test/kotlin/fr/groupbees/ (0, 2023-11-27)
src/test/kotlin/fr/groupbees/midgard/ (0, 2023-11-27)
src/test/kotlin/fr/groupbees/midgard/CodeSamples.kt (5376, 2023-11-27)
src/test/kotlin/fr/groupbees/midgard/PCollectionExtensionsTest.kt (23956, 2023-11-27)
src/test/kotlin/fr/groupbees/midgard/Player.kt (195, 2023-11-27)
src/test/kotlin/fr/groupbees/midgard/Team.kt (168, 2023-11-27)

# Midgard ![beam-kotlin](https://github.com/tosun-si/midgard/blob/master/beam-kotlin.png) Because **Beam** **Kotlin**, we created a new open source library called **Midgard** to : - Have more concise and expressive code - Remove **Beam** boilerplate code - Propose more **Functional Programming** style This module is a `Beam` wrapper on `Kotlin` and proposes some extensions on `PCollection` `DoFn` and `IO connectors` Behind the scene `Kotlin` extensions are used, the main advantage of this technic is adding behaviours and methods to an existing structure without affecting it. ## Versions compatibility between Beam and Midgard | Midgard | Beam | |---------|--------| | 0.15.0 | 2.44.0 | | 0.16.0 | 2.45.0 | | 0.17.0 | 2.46.0 | | 0.18.0 | 2.47.0 | | 0.19.0 | 2.48.0 | | 0.20.0 | 2.49.0 | | 0.21.0 | 2.50.0 | | 0.22.0 | 2.51.0 | | 0.23.0 | 2.52.0 | ## Installation of project The project is hosted on Maven repository.\ You can install it with all the build tools compatibles with Maven. Example with Maven and Gradle : #### Maven ```xml fr.groupbees midgard 0.23.0 ``` #### Gradle ```text implementation group: 'fr.groupbees', name: 'midgard', version: '0.23.0' ``` ## 1- Extensions on PCollection ### 1-1 Usual Beam operators : map, flatMap and filter Test data : ```kotlin val psgPlayers = listOf( Player(firstName = "Kylian", lastName = "Mbappe", 24), Player(firstName = "Marco", lastName = "Verrati", 28) ) val realPlayers = listOf( Player(firstName = "Karim", lastName = "Benzema", 35), Player(firstName = "Luca", lastName = "Modric", 39) ) // Given. val psgTeam = Team(name = "PSG", slogan = "Ici c'est Paris", psgPlayers) val realTeam = Team(name = "REAL", slogan = "Hala Madrid", realPlayers) ``` Example of usual `Beam` pipeline with `map`, `flatMap` and `filter` operations : ```kotlin val resultPlayers: PCollection = pipeline .apply("Create", Create.of(listOf(psgTeam, realTeam))) .apply( "To Team with Slogan V2", MapElements .into(TypeDescriptor.of(Team::class.java)) .via(SerializableFunction { it.copy(slogan = "${it.slogan} VERSION 2") }) ) .apply( "To Players", FlatMapElements .into(TypeDescriptor.of(Player::class.java)) .via(SerializableFunction { it.players }) ) .apply("Filter age > 25", Filter.by(SerializableFunction { it.age > 25 })) ``` The same pipeline with `Midgard` library : ```kotlin import fr.groupbees.midgard.* val resultPlayersMidgard: PCollection = pipeline .apply("Create", Create.of(listOf(psgTeam, realTeam))) .map("To Team with Slogan V2") { it.copy(slogan = "${it.slogan} VERSION 2") } .flatMap("To Players") { it.players } .filter("Filter age > 25") { it.age > 25 } ``` For each operator, there is its equivalent with `Midgard` : - `MapElements` -> `map` - `FlatMapElements` -> `flatMap` - `Filter` -> `filter` To use extensions offered by `Midgard`, you have to add the following import in the code : ``` import fr.groupbees.midgard.* ``` Another big advantage of using `Kotlin` extensions, is the possibility to mix native methods of the `PCollection` with those specific to `Midgard`. The previous example contains : Native method of the `PCollection` ```kotlin .apply("Create", Create.of(listOf(psgTeam, realTeam))) ``` Mixed with extensions and methods brought by `Midgard` : ```kotlin .apply("Create", Create.of(listOf(psgTeam, realTeam))) .map("To Team with Slogan V2") { it.copy(slogan = "${it.slogan} VERSION 2") } .flatMap("To Players") { it.players } .filter("Filter age > 25") { it.age > 25 } ``` The `map`, `flatMap` and `filter` operators take as parameters : - The name and pipeline step - Lambda expression or the implementation of the function, to apply the needed operation - The `Beam` `TypeDescriptor` is deduced inside the operators ### 1-2 Operators interacting with Beam DoFn lifecycle `Beam` allows interacting with the [DoFn](https://github.com/tosun-si/midgard/blob/master/https://beam.apache.org/documentation/programming-guide/) lifecycle (check **DoFn lifecycle** section) : - Setup - Start Bundle - Finish Bundle - Teardown To be able to use these steps in the lifecycle, we have to create a class that extends `DoFn` and override the needed function with the associated annotation, example : ```java @Setup public void setup() { // Setup action. } @StartBundle public void startBundle() { // Start Bundle action. } @FinishBundle public void finishBundle() { // Finish Bundle action. } @Teardown public void teardown() { // Teardown action. } ``` `Midgard` allows to propose `map` and `flatMap` operators and extensions while interacting with this lifecycle. #### Example with a `map` and an interaction with all the lifecycle functions : ```kotlin val resultTeamMidgardMapLifeCycle: PCollection = pipeline .apply("Create", Create.of(listOf(psgTeam, realTeam))) .mapFn( name = "To Team with Slogan V2", transform = { it.copy(slogan = "${it.slogan} VERSION 2") }, setupAction = { println("Setup Action") }, startBundleAction = { println("Start Bundle Action") }, finishBundleAction = { println("Finish Bundle Action") }, teardownAction = { println("Teardown Action") } ) ``` - `name` : the name of the current step - `transform` : current function containing the transformation logic - `setupAction` : action executed in the `Setup` `DoFn` lifecycle - `startBundleAction` : action executed in the `Start Bundle` `DoFn` lifecycle - `finishBundleAction` : action executed in the `Finish Bundle` `DoFn` lifecycle - `teardownAction` : action executed in the `Teardown` `DoFn` lifecycle #### Example with `flatMap` : ```kotlin val resultPlayersMidgardFlatMapLifeCycle: PCollection = pipeline .apply("Create", Create.of(listOf(psgTeam, realTeam))) .map("To Team with Slogan V2") { it.copy(slogan = "${it.slogan} VERSION 2") } .flatMapFn( name = "To Players", transform = { it.players }, setupAction = { println("Setup Action") }, startBundleAction = { println("Start Bundle Action") }, finishBundleAction = { println("Finish Bundle Action") }, teardownAction = { println("Teardown Action") } ) ``` ### 1-3 Operators interacting with Beam DoFn lifecycle and context Sometimes we need to access to the current `ProcessContext` while applying the current transformation.\ It's the case for example when we want to deal with side inputs. #### Example with a `map` operation ```kotlin // Simulate a side input for the slogan suffix. val slogansSideInput: PCollectionView = pipeline .apply("Read slogans", Create.of("VERSION 2")) .apply("Create as collection view", View.asSingleton()) val resultTeamMidgardMapContextLifeCycle: PCollection = pipeline .apply("Create", Create.of(listOf(psgTeam, realTeam))) .mapFnWithContext( name = "To Team with Slogan V2", transform = { context -> toTeamWithSloganSuffixFromSideInput(slogansSideInput, context) }, setupAction = { println("Setup Action") }, sideInputs = listOf(slogansSideInput), startBundleAction = { println("Start Bundle Action") }, finishBundleAction = { println("Finish Bundle Action") }, teardownAction = { println("Teardown Action") } ) private fun toTeamWithSloganSuffixFromSideInput( sideInput: PCollectionView, context: DoFn.ProcessContext ): Team { val currentTeam: Team = context.element() val sloganSuffixSideInput: String = context.sideInput(sideInput) return currentTeam.copy(slogan = "${currentTeam.slogan} $sloganSuffixSideInput") } ``` We simulated a side input with a slogan suffix as a `PCollectionView`.\ The `mapFnWithContext` method gives access to the current `DoFn` `ProcessContext` and allows : - To retrieve the current `String` value inside the side input - To retrieve the current `Team` input element The `toTeamWithSloganSuffixFromSideInput` method adds the side input suffix to the current slogan and returns a copy of the current object with the newly updated slogan field. #### Example with `flatMap` operation The principle is the same for `flatMapFnWithContext`, for the sake of simplicity we show an example without side input : ```kotlin val resultPlayersMidgardFlatMapContextLifeCycle: PCollection = pipeline .apply("Create", Create.of(listOf(psgTeam, realTeam))) .map("To Team with Slogan V2") { it.copy(slogan = "${it.slogan} VERSION 2") } .flatMapFnWithContext( name = "To Players", transform = { context -> context.element().players }, setupAction = { println("Setup Action") }, startBundleAction = { println("Start Bundle Action") }, finishBundleAction = { println("Finish Bundle Action") }, teardownAction = { println("Teardown Action") } ) ``` In this example, the current input element in the context is directly returned. ### Next steps and evolutions The library could add in the future, extensions on native `Beam` IO like : - TextIO - BigQueryIO - ...

近期下载者

相关文件


收藏者