dafka

所属分类:其他
开发工具:C
文件大小:0KB
下载次数:0
上传日期:2020-09-11 18:40:36
上 传 者sh-1993
说明:  Dafka是一个去中心化的分布式流媒体平台,
(Dafka is a decentralized distributed streaming platform,)

文件列表:
.clang-format (1487, 2020-09-11)
.editorconfig (546, 2020-09-11)
.travis.yml (4835, 2020-09-11)
AUTHORS (116, 2020-09-11)
CMakeLists.txt (26137, 2020-09-11)
Dockerfile (1158, 2020-09-11)
Findcucumber.cmake (1771, 2020-09-11)
Findczmq.cmake (1684, 2020-09-11)
Findleveldb.cmake (1747, 2020-09-11)
Findlibzmq.cmake (3558, 2020-09-11)
LICENSE (16725, 2020-09-11)
Makefile.am (2165, 2020-09-11)
api/ (0, 2020-09-11)
api/dafka_consumer.api (1741, 2020-09-11)
api/dafka_consumer_msg.api (1715, 2020-09-11)
api/dafka_producer.api (190, 2020-09-11)
api/dafka_producer_msg.api (2334, 2020-09-11)
api/dafka_proto.api (5703, 2020-09-11)
autogen.sh (1926, 2020-09-11)
builds/ (0, 2020-09-11)
builds/check_zproject/ (0, 2020-09-11)
builds/check_zproject/ci_build.sh (1928, 2020-09-11)
builds/check_zproto/ (0, 2020-09-11)
builds/check_zproto/ci_build.sh (648, 2020-09-11)
builds/cmake/ (0, 2020-09-11)
builds/cmake/Config.cmake.in (125, 2020-09-11)
builds/cmake/Modules/ (0, 2020-09-11)
builds/cmake/Modules/ClangFormat.cmake (2314, 2020-09-11)
builds/cmake/ci_build.sh (10840, 2020-09-11)
builds/cmake/clang-format-check.sh.in (927, 2020-09-11)
ci_build.sh (22385, 2020-09-11)
ci_deploy.sh (1112, 2020-09-11)
ci_deploy_obs.sh (900, 2020-09-11)
configure.ac (37758, 2020-09-11)
doc/ (0, 2020-09-11)
... ...

[![GitHub release](https://img.shields.io/github/release/zeromq/dafka.svg)](https://github.com/zeromq/dafka/releases) [![license](https://img.shields.io/badge/license-MPLV2.0-blue.svg)](https://github.com/zeromq/dafka/blob/master/LICENSE) # Dafka - Decentralized Distributed Streaming Platform [![Build Status](https://travis-ci.org/zeromq/dafka.png?branch=master)](https://travis-ci.org/zeromq/dafka) ## Contents **[Overview](#overview)** * [Scope and Goals](#scope-and-goals) * [Topics and Partitions](#topics-and-partitions) * [Stores](#stores) * [Producer](#producer) * [Consumer](#consumer) * [Tower](#tower) * [Guarantees](#guarantees) **[Design](#design)** * [Producing and Storing](#producing-and-storing) * [Subscribing to topics](#subscribing-to-topics) * [Missed records](#missed-records) **[Implementation](#implementation)** **[Ownership and License](#ownership-and-license)** **[Using Dafka](#using-dafka)** * [Building and Installing on Linux and macOS](#building-and-installing-on-linux-and-macos) * [Quickstart](#quickstart)  [Step 1: Start the Dafka Tower Deamon](#step-1-start-the-dafka-tower-deamon)  [Step 2: Write some events into a topic](#step-2-write-some-events-into-a-topic)  [Step 3: Read the events](#step-3-read-the-events) * [Getting started](#getting-started) * [Linking with an Application](#linking-with-an-application) * [API v1 Summary](#api-v1-summary)  [dafka_consumer - Implements the dafka consumer protocol](#dafka_consumer---implements-the-dafka-consumer-protocol)  [dafka_producer - Implements the dafka producer protocol](#dafka_producer---implements-the-dafka-producer-protocol)  [dafka_store - no title found](#dafka_store---no-title-found)  [dafka_tower - no title found](#dafka_tower---no-title-found) **[Contributing](#contributing)** * [Documentation](#documentation) * [Development](#development) * [Hints to Contributors](#hints-to-contributors) * [Code Generation](#code-generation)  [Docker](#docker) * [This Document](#this-document) ## Overview ### Scope and Goals Dafka is a decentralize distributed streaming platform. What exactly does that mean? A streaming platform has three key capabilities: * Publish and subscribe to streams of records, similar to a message queue or enterprise messaging system. * Store streams of records in a fault-tolerant durable way. * Process streams of records as they occur. Dafka is generally used for two broad classes of applications: * Building real-time streaming data pipelines that reliably get data between systems or applications * Building real-time streaming applications that transform or react to the streams of data To understand how Dafka does these things, let's dive in and explore Dafka's capabilities from the bottom up. First a few concepts: * Dafka is run as a cluster on one or more servers. * The Dafka cluster stores streams of records in categories called topics. * Each record consists of a arbitrary value. * Producers send record to the Cluster and directly to the Consumers. * Missed records are obtained either from the Producer or the Cluster. In Dafka the communication between clients is done with a simple, high-performance, language and transport agnostic protocol. This protocol is versioned and maintains backwards compatibility with older version. We provide a C and Java client for Dafka. ### Topics and Partitions Dafka provides an abstraction for records called topic. A topic is a name to which records are published. Topics in Dafka are always multi-subscriber; that is, a topic can have zero, one, or many consumers that subscribe to the records written to it. Each Dafka topic consists of at least one partitions that looks like this:
1
Each partition is an ordered, immutable sequence of records that is continually appended to. The records in the partitions are each assigned a sequential id number called the offset that uniquely identifies each record within the partition. The Dafka cluster durably persists all published records — whether or not they have been consumed.
2
Consumers maintain their own offset while reading records of a partition. In fact neither the Dafka Cluster nor the producers keep track of the consumers offset. This design allows Consumer to either reset their offset to an older offset and re-read records or set their offset to a newer offset and skip ahead. In that way consumer have no influence on the cluster, the producer and other consumers. They simply can come and go as they please. ### Stores Partitions are distributed to the Dafka Cluster which consists of Dafka Stores. Each partition is replicated to each store for fault tolerance. ### Producer Producers publish records to a topic. Each producer creates its own partition that only it publishes to. Records are send directly to *stores* and *consumers*. When a producer goes offline its partition is still available to consumers from the Dafka stores. ### Consumer Consumers subscribe to a topic. Each consumer will receive records published to that topic from all partitions. ### Tower Each Dafka cluster has one or more towers. The towers are used to connect producer, consumers and stores to each other. At this point no traffic is proxied through the towers. ### Guarantees Dafka gives the following guarantees: * Records sent by a producer are appended in the stores in the same order they are sent. * Consumers will provide records of a partition to the user in the same order they are sent by the producer. ## Design We designed Dafka to be a drop-in replacement for Apache Kafka. While Kafka makes it easy for consumers to come and go as they like, their consumer group feature which relies on finding consensus in a group of peers makes joining very expensive. It can take seconds before a consumer is ready to consume records. The same is true for producer. Dafka tries to avoid finding consensus and therefore intentionally avoids features like consumer groups in favor of higher throughput, lower latency as well as faster consumer and producer initialization. This design section discusses the different message types of the Dafka protocol. ### Producing and Storing Producers published records using the RECORD message type. RECORD messages are send directly to all connected stores as well as all connected consumers. Once a producer published its first records it starts sending HEAD messages at a regular interval informing both stores and consumer about the last published records which gives stores and consumers a chance to figure out whether or not the missed one or more records.
3
Because producers publish records directly to consumers the presence of a store is not necessarily required. When a new consumer joins they can request the producers to supply all already published records. Therefore the producer must store all published records that are not stored by a configurable minimum number stores. To inform a producer about the successful storing of a records the stores send a ACK message to the producer.
4
### Subscribing to topics Consumer will only start listening for HEAD message once they subscribed for a topic. Whenever a new subscription created by a consumer it is not enough to listen to producers HEAD messages to catch up upon the current offset of their partition. For one there's a time penalty until producers HEAD intervals triggered and more severe a producer may already have disappeared. Hence consumers will send a GET-HEADS message to the stores to request the offset for each partition they stored for a topic.
5
As a response each stores will answer with DIRECT-HEAD messages each containing the offset for a partition.
6
### Missed records Consumer can discover missed records by either receiving HEAD messages or receiving a RECORD messages with a higher offset than they currently have for a certain partition. In order to fetch missed messages consumers send a FETCH message to all connected stores and the producer of that message to request the missed messages.
7
As a response to a FETCH message a store and/or producer may send all missed records that the consumer requested directly to the consumer with the DIRECT-RECORD.
8
## Implementation The implementation is documented in RFC [46/DAFKA](https://rfc.zeromq.org/spec/46) ## Ownership and License The contributors are listed in AUTHORS. This project uses the MPL v2 license, see LICENSE. Dafka uses the [C4.1 (Collective Code Construction Contract)](http://rfc.zeromq.org/spec:22) process for contributions. Dafka uses the [CLASS (C Language Style for Scalability)](http://rfc.zeromq.org/spec:21) guide for code style. To report an issue, use the [Dafka issue tracker](https://github.com/zeromq/dafka/issues) at github.com. ## Using Dafka ### Building and Installing on Linux and macOS To start with, you need at least these packages: * `git-all` -- git is how we share code with other people. * `build-essential`, `libtool`, `pkg-config` - the C compiler and related tools. * `autotools-dev`, `autoconf`, `automake` - the GNU autoconf makefile generators. * `cmake` - the CMake makefile generators (an alternative to autoconf). Plus some others: * `uuid-dev`, `libpcre3-dev` - utility libraries. * `valgrind` - a useful tool for checking your code. * `pkg-config` - an optional useful tool to make building with dependencies easier. Which we install like this (using the Debian-style apt-get package manager): sudo apt-get update sudo apt-get install -y \ git-all build-essential libtool \ pkg-config autotools-dev autoconf automake cmake \ uuid-dev libpcre3-dev valgrind # only execute this next line if interested in updating the man pages as well (adds to build time): sudo apt-get install -y asciidoc Here's how to build DAFKA from GitHub (building from packages is very similar, you don't clone a repo but unpack a tarball), including the libzmq (ZeroMQ core) library (NOTE: skip ldconfig on OSX): git clone git://github.com/zeromq/libzmq.git cd libzmq ./autogen.sh # do not specify "--with-libsodium" if you prefer to use internal tweetnacl security implementation (recommended for development) ./configure --with-libsodium make check sudo make install sudo ldconfig cd .. git clone git://github.com/zeromq/czmq.git cd czmq ./autogen.sh && ./configure && make check sudo make install sudo ldconfig cd .. git clone git://github.com/zeromq/dafka.git cd dafka ./autogen.sh && ./configure && make check sudo make install sudo ldconfig cd .. To verify everything got installed correctly run: make check ### Quickstart If you are interested in getting started with Dafka follow the instructions below. #### Step 1: Start the Dafka Tower Deamon We'll start to use dafka in a simple single producer/single consumer scenario using the `dafka_console_producer` and `dafka_console_consumer` commandline utilities. ```sh $ dafka_towerd ``` Note: The tower will open two sockets. One on port 5556 to get notified by joining peers and one on port 5557 to notify joined peers about joining peers. #### Step 2: Write some events into a topic Run the console producer to write a few events into the `hello` topic. Each line you enter will result in a separate event being written to the topic. ```sh $ dafka_console_producer hello A first event A second event ``` You can stop the producer client with Ctrl-C at any time. Note: If no configuration is provided the producer tries to connect to a tower on localhost. #### Step 3: Read the events Open another terminal session and run the console consumer to read the events you just created: ```sh $ dafka_console_consumer hello ``` You can stop the consumer client with Ctrl-C at any time. Because events are kept by the producer until at least one store acknowledges they're stored, they can be read as many times and by as many consumers as you want. You can easily verify this by opening yet another terminal session and re-running the previous command again. Note: If no configuration is provided the consumer tries to connect to a tower on localhost. ### Getting started The following getting started will show you how to use the producer and consumer API. First we construct a dafka producer with default configuration and then publish the message `HELLO WORLD` to the topic `hello`. ```c zconfig_t *config = zconfig_new ("root", NULL); const char *topic = "hello"; dafka_producer_args_t producer_args = { topic, config }; zactor_t *producer = zactor_new (dafka_producer, &producer_args); dafka_producer_msg_t *msg = dafka_producer_msg_new (); dafka_producer_msg_set_content (msg, "HELLO WORLD"); dafka_producer_msg_send (msg, producer); dafka_producer_msg_destroy (&msg); zactor_destroy (&producer); zconfig_destroy (&config); ``` To consume this message we constuct a dafka consumer, let it subscribe to topic `hello`, receive the message and then print the content of received message. ```c zconfig_t *config = zconfig_new ("root", NULL); const char *topic = "hello"; dafka_consumer_args_t args = { .config = config }; dafka_consumer_t *consumer = dafka_consumer_new (&args); dafka_consumer_subscribe (consumer, topic); dafka_consumer_msg_t *msg = dafka_consumer_msg_new (); while (true) { rc = dafka_consumer_msg_recv (msg, consumer); if (rc == -1) break; // Interrupted char *content_str = dafka_consumer_msg_strdup (msg); printf ("%s\n", content_str); zstr_free (&content_str); } dafka_consumer_msg_destroy (&msg); dafka_consumer_destroy (&consumer); zconfig_destroy (&config); ``` ### Linking with an Application Include `dafka.h` in your application and link with libdafka. Here is a typical gcc link command: gcc myapp.c -o myapp -ldafka -lczmq -lzmq ### API v1 Summary This is the API provided by Dafka v1.x, in alphabetical order. #### dafka_consumer - Implements the dafka consumer protocol dafka_consumer - Consumes message either directly from producers or from stores TODO: - Prioritize DIRECT_RECORD messages over RECORD this will avoid discarding MSGs when catching up This is the class interface: ```h // This is a stable class, and may not change except for emergencies. It // is provided in stable builds. // Creates a new dafka consumer client that runs in its own background thread. // // The args parameter consists of configuration and record sink. // // If a record sink is provided this socket will be used the send the consumer // messages to. // // The configuration argument takes settings for both the consumer and the // beacon, see below. // // Consumer configuration: // * consumer/offset/reset = earliest|latest (default: latest) // * consumer/high_watermark (default: 1.000.000) // * consumer/verbose = 0|1 (default: 0 -> false) // // Beacon configuration: // * beacon/interval (default: 1000) in ms // * beacon/verbose = 0|1 (default: 0 -> false) // * beacon/sub_address (default: tcp://127.0.0.1:5556) // * beacon/pub_address (default: tcp://127.0.0.1:5557) DAFKA_EXPORT dafka_consumer_t * dafka_consumer_new (dafka_consumer_args_t *args); // Destroys an instance of dafka consumer client by gracefully stopping its // background thread. DAFKA_EXPORT void dafka_consumer_destroy (dafka_consumer_t **self_p); // Subscribe to a given topic. DAFKA_EXPORT int dafka_consumer_subscribe (dafka_consumer_t *self, const char *subject); // Unsubscribe from a topic currently subscribed to. DAFKA_EXPORT int dafka_consumer_unsubscribe (dafka_consumer_t *self, const char *subject); // Returns the address of the consumer instance. DAFKA_EXPORT const char * dafka_consumer_address (dafka_consumer_t *self); // Get the current subscription as list of strings. DAFKA_EXPORT zlist_t * dafka_consumer_subscription (dafka_consumer_t *self); // Returns the internal record source socket. DAFKA_EXPORT zsock_t * dafka_consumer_record_source (dafka_consumer_t *self); // Self test of this class. DAFKA_EXPORT void dafka_consumer_test (bool verbose); ``` Please add '@interface' section in './../src/dafka_consumer.c'. This is the class self test code: ```c zconfig_t *config = zconfig_new ("root", NULL); zconfig_put (config, "test/verbose", verbose ? "1" : "0"); zconfig_put (config, "beacon/interval", "50"); zconfig_put (config, "beacon/verbose", verbose ? "1" : "0"); zconfig_put (config, "beacon/sub_address", "inproc://consumer-tower-sub"); zconfig_put (config, "beacon/pub_address", "inproc://consumer-tower-pub"); zconfig_put (config, "tower/verbose", verbose ? "1" : "0"); zconfig_put (config, "tower/sub_address", "inproc://consumer-tower-sub"); zconfig_put (config, "tower/pub_address", "inproc://consumer-tower-pub"); zconfig_put (config, "consumer/verbose", verbose ? "1" : "0"); zconfig_put (config, "producer/verbose", verbose ? "1" : "0"); zconfig_put (config, "store/verbose", verbose ? "1" : "0"); zconfig_put (config, "store/db", SELFTEST_DIR_RW "/storedb"); zactor_t *tower = zactor_new (dafka_tower_actor, config); // -------------- // Protocol Tests // -------------- // Scenario: STORE-HELLO -> CONSUMER-HELLO without subscription // Given a dafka consumer with no subscriptions // When a STORE-HELLO command is sent by a store // Then the consumer responds with CONSUMER-HELLO and 0 topics zconfig_put (config, "consumer/offset/reset", "earliest"); zactor_t *test_peer = zactor_new (dafka_test_peer, config); assert (test_peer); // GIVEN a dafka consumer with no subscription dafka_consumer_args_t consumer_args = { .config = config }; dafka_consumer_t *consumer = dafka_consumer_new (&consumer_args); assert (consumer); zclock_sleep (250); // Make sure both peers are connected to each other zlist_t *subscription = dafka_consumer_subscription (consumer); assert (zlist_size (subscription) == 0); // WHEN a STORE-HELLO command is send by a store dafka_test_peer_send_store_hello (test_peer, dafka ... ...

近期下载者

相关文件


收藏者