kafboy

所属分类:其他
开发工具:Erlang
文件大小:0KB
下载次数:0
上传日期:2016-01-18 10:09:07
上 传 者sh-1993
说明:  用于写入Kafka的低延迟HTTP服务器,
(A low latency HTTP server for writing to Kafka,)

文件列表:
benchmarks/
include/
rel/
src/
VERSION
rebar.config

# kafboy a low latency http server for writing to kafka. Optimized for heavy loads, hundreds of partition workers, supports batching, and more. Written in Erlang. Powered by `ekaf` and `Cowboy` ![ordered_round_robin](https://github.com/benchmarks/n30000_c100_strategy_random.png) *see https://github.com/helpshift/ekaf for more information* ## Architecture With 0.8, Kafka clients take greater responsibility of deciding which broker and partition to publish to for a given topic. kafboy is a http wrapper over the ekafka client, that takes care of routing http requests to the right kafka broker socket. kafboy is self-aware over a cluster, and supoprts nodes routing requests arriving on any node, to the right process in the cluster. Simply send a POST with the desired JSON, to one of the following paths ## Fire and forget % fire and forget asynchronous call. the event is immediately send to kafka asynchronously POST /async/topic ## Synchronous calls % synchronous call that returns with the response after sending to kafka % `NOTE: a reply is sent until after kafka resonds, so is not recommended for low latency needs` POST /sync/topic ## Batching % will be added to a queue, and sent to the broker in a batch. % batch size, and flush timeout are configurable POST /batch/async/topic The payload is expected to be of the JSON format, but this can be configured to send the data as is. Very little else is done by this server in terms of dealing with kafka. It simply calls ekafka's produce function. ## Configuring kafboy {kafboy,[ % optional. you get to edit the json before it goes to kafka over here {kafboy_callback_edit_json, {my_module, massage_json}}, % M:F({post, Topic, Req, Json, Callback}) will be called. return with what you want to send to kafka % if an error occurs M:F({error, StatusCode, Message}) wil be called % optional. {kafboy_load_balancer, "http://localhost:8080/disco"} % should return plaintext of a node name with the right cookie eg: `node2@some-host` % can be used to distribute work to other nodes if ekaf thinks this one is too busy % optional, see more in kafboy_app.erl {kafboy_routes_async_batch,["/1/foo/:topic"]}, {kafboy_routes_async,[]}, {kafboy_routes_sync,[]} ]} In this example, you have to implement my_module:massage_json/1, on the lines of massage_json({post, Topic, Req, Body, Callback})-> Callback ! { edit_json_callback, Topic, Body }. Here is a more elaborate example: %% Let's check for the contents of Body %% and if its valid, add an extra field %% and then submit to kafka massage_json({post, Topic, _Req, Body, CallbackPid})-> case Body of [{<<"hello">>, Foo}] -> % either reply like this CallbackPid ! { edit_json_callback, Topic, Foo }; [] -> CallbackPid ! { edit_json_callback, {error, <>}}; _ -> %% i want to first reply CallbackPid ! { edit_json_callback, {200, <<"{\"ok\":\"fast reply\"}">>}}, %% then directly call ekaf, adding this msg to a batch Final = jsx:encode([{<<"extra">>,<<"true">>}| Body]), ekaf:produce_async_batched(Topic, Final) end; massage_json({error, Status, Message}) -> io:format("~n some ~p error: ~p",[Status, Message]), ok. kafboy will handle sending batch requests where the batch size is configurable, disconnections with brokers, and max retries. To see the API of ekaf, see http://github.com/helpshift/ekaf ## Quick start On terminal 1 git clone https://github.com/helpshift/kafboy cd kafboy rebar get-deps compile erl -pa deps/*/ebin -pa ebin -s kafboy_demo On terminal 2 curl localhost:9903/batch/async/ekaf -XPOST -d 'test=a' {"ok":"fast reply"} curl localhost:9903/batch/async/ekaf -XPOST -d 'hello=a' {"ok":1} curl localhost:9903/batch/async/ekaf -XPOST {"error":"ekaf.insufficient"} ## Configuring ekaf #### An example ekaf config {ekaf,[ % required. {ekaf_bootstrap_broker, {"localhost", 9091} }, % pass the {BrokerHost,Port} of atleast one permanent broker. Ideally should be % the IP of a load balancer so that any broker can be contacted % optional {ekaf_per_partition_workers,100}, % how big is the connection pool per partition % eg: if the topic has 3 partitions, then with this eg: 300 workers will be started % optional {ekaf_max_buffer_size, [{<<"topic">>,10000}, % for specific topic {ekaf_max_buffer_size,100}]}, % for other topics % how many events should the worker wait for before flushing to kafka as a batch % optional {ekaf_partition_strategy, random} % if you are not bothered about the order, use random for speed % else the default is ordered_round_robin ]}, To see how to configure the number of workers per topic+partition, the buffer batch size, buffer flush ttl, and more see the extensive README for `ekaf` https://github.com/helpshift/ekaf ## License ``` Copyright 2014, Helpshift, Inc. Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at http://www.apache.org/licenses/LICENSE-2.0 Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions and limitations under the License. ``` Add a feature request at https://github.com/helpshift/ekaf or check the ekaf web server at https://github.com/helpshift/kafboy

近期下载者

相关文件


收藏者