- Notifications
You must be signed in to change notification settings - Fork31
Elixir library client for work with ClickHouse
balance-platform/pillar
Folders and files
Name | Name | Last commit message | Last commit date | |
---|---|---|---|---|
Repository files navigation
Elixir client forClickHouse, a fast open-sourceOnline Analytical Processing (OLAP) database management system.
Addpillar
to your list of dependencies inmix.exs
:
defdepsdo[{:pillar,"~> 0.39.0"}]end
Here's a simple example to get started with Pillar:
# Create a direct connectionconn=Pillar.Connection.new("http://user:password@localhost:8123/database")# Execute a SELECT query{:ok,result}=Pillar.select(conn,"SELECT * FROM users LIMIT 10")# Execute a parameterized query{:ok,result}=Pillar.query(conn,"SELECT * FROM users WHERE created_at > {min_date} LIMIT {limit}",%{min_date:"2023-01-01",limit:100})# Insert data{:ok,_}=Pillar.insert(conn,"INSERT INTO events (user_id, event_type, created_at) VALUES ({user_id}, {event_type}, {created_at})",%{user_id:123,event_type:"login",created_at:DateTime.utc_now()})
Pillar offers a comprehensive set of features for working with ClickHouse:
The most straightforward way to use Pillar is by creating a direct connection to your ClickHouse server:
# Create a connection to your ClickHouse serverconn=Pillar.Connection.new("http://user:password@localhost:8123/database")# Parameters are passed in curly braces {} in the SQL query# The map structure provides values for these parameterssql="SELECT count(*) FROM users WHERE lastname = {lastname}"params=%{lastname:"Smith"}{:ok,result}=Pillar.query(conn,sql,params)result#=> [%{"count(*)" => 347}]
You can also specify additional connection options:
conn=Pillar.Connection.new("http://user:password@localhost:8123/database",%{timeout:30_000,# Connection timeout in millisecondspool_timeout:5_000,# Pool timeout in millisecondsrequest_timeout:60_000# Request timeout in milliseconds})
For production environments, using a connection pool is highly recommended. This approach provides:
- Efficient connection management
- Supervised workers
- Load balancing across multiple ClickHouse servers
- Better performance under high load
defmoduleClickhouseMasterdousePillar,connection_strings:["http://user:password@host-master-1:8123/database","http://user:password@host-master-2:8123/database"],name:__MODULE__,pool_size:15,pool_timeout:10_000,# Time to wait for a connection from the pooltimeout:30_000# Default query timeoutend# Start the connection pool as part of your supervision treeClickhouseMaster.start_link()# Execute queries using the pool{:ok,result}=ClickhouseMaster.select("SELECT * FROM users WHERE age > {min_age}",%{min_age:21}){:ok,_}=ClickhouseMaster.insert("INSERT INTO logs (message) VALUES ({message})",%{message:"User logged in"})
The pool automatically manages connection acquisition and release, and can handle multiple ClickHouse servers for load balancing and high availability.
Asynchronous inserts are useful for non-blocking operations when you don't need to wait for a response. This is particularly valuable for:
- Logging events
- Metrics collection
- Any high-volume insert operations where immediate confirmation isn't required
# Using a connection pool (recommended approach)ClickhouseMaster.async_insert("INSERT INTO events (user_id, event, timestamp) VALUES ({user_id}, {event}, {timestamp})",%{user_id:user.id,event:"password_changed",timestamp:DateTime.utc_now()})# => :ok# The request is sent and the function returns immediately without waiting for a response
Note: Async inserts are only available when using a connection pool created withuse Pillar
. If you attempt to usePillar.async_insert/4
directly with a connection structure, it will raise an error.
The bulk insert buffer feature allows you to collect records in memory and insert them in batches at specified intervals. This is highly efficient for:
- High-frequency event logging
- Metrics collection
- Any scenario where you need to insert many small records
This feature requires aPool of workers to be set up first.
defmoduleBulkToLogsdousePillar.BulkInsertBuffer,# Reference to your Pillar connection poolpool:ClickhouseMaster,# Target table for insertstable_name:"logs",# How often to flush buffered records (seconds)# Default is 5 seconds if not specifiedinterval_between_inserts_in_seconds:5,# Optional error handler functionon_errors:&__MODULE__.dump_to_file/2,# Maximum records to buffer before forcing a flush# Optional, defaults to 10000max_buffer_size:5000@doc""" Error handler that stores failed inserts into a file Parameters: - result: The error result from ClickHouse - records: The batch of records that failed to insert """defdump_to_file(_result,records)dotimestamp=DateTime.utc_now()|>DateTime.to_string()|>String.replace(":","-")directory="bad_inserts"# Ensure the directory existsFile.mkdir_p!(directory)# Write failed records to a fileFile.write("#{directory}/#{timestamp}.log",inspect(records,pretty:true))end@doc""" Alternative error handler that attempts to retry failed inserts Note: Retrying can be risky in case of persistent errors """defretry_insert(_result,records)do# Add a short delay before retryingProcess.sleep(1000)__MODULE__.insert(records)endend
Usage example:
# Records are buffered in memory until the flush interval:ok=BulkToLogs.insert(%{value:"online",count:133,datetime:DateTime.utc_now()}):ok=BulkToLogs.insert(%{value:"online",count:134,datetime:DateTime.utc_now()}):ok=BulkToLogs.insert(%{value:"offline",count:42,datetime:DateTime.utc_now()})# All these records will be inserted in a single batch after the configured interval (5 seconds by default)
Theon_errors
parameter is a callback function that will be invoked if an error occurs during bulk insert. This is useful for:
- Logging failed inserts
- Writing failed records to a backup location
- Implementing custom retry logic
The callback receives two parameters:
- The error result from ClickHouse
- The batch of records that failed to insert
Pillar provides a migrations system to help you manage your ClickHouse database schema changes in a version-controlled manner. This feature is particularly useful for:
- Creating tables
- Modifying schema
- Ensuring consistent database setup across environments
- Tracking schema changes over time
Migrations can be generated with the mix task:
mix pillar.gen.migration create_events_table
This creates a new migration file inpriv/pillar_migrations
with a timestamp prefix, for example:priv/pillar_migrations/20250528120000_create_events_table.exs
defmodulePillar.Migrations.CreateEventsTabledodefupdo""" CREATE TABLE IF NOT EXISTS events ( id UUID, user_id UInt64, event_type String, payload String, created_at DateTime ) ENGINE = MergeTree() ORDER BY (created_at, id) """end# Optional: Implement a down function for rollbacksdefdowndo"DROP TABLE IF EXISTS events"endend
For complex scenarios where you need to execute multiple statements in a single migration, return a list of strings:
defmodulePillar.Migrations.CreateMultipleTablesdodefupdo# For multi-statement migrations, return a list of strings[""" CREATE TABLE IF NOT EXISTS events ( id UUID, user_id UInt64, event_type String, created_at DateTime ) ENGINE = MergeTree() ORDER BY (created_at, id) """,""" CREATE TABLE IF NOT EXISTS event_metrics ( date Date, event_type String, count UInt64 ) ENGINE = SummingMergeTree(count) ORDER BY (date, event_type) """]endend
You can also dynamically generate migrations:
defmodulePillar.Migrations.CreateShardedTablesdodefupdo# Generate 5 sharded tables(0..4)|>Enum.map(fni->""" CREATE TABLE IF NOT EXISTS events_shard_#{i} ( id UUID, user_id UInt64, event_type String, created_at DateTime ) ENGINE = MergeTree() ORDER BY (created_at, id) """end)endend
To run migrations, create a mix task:
defmoduleMix.Tasks.MigrateClickhousedouseMix.Task@shortdoc"Runs ClickHouse migrations"defrun(args)do# Start any necessary applicationsApplication.ensure_all_started(:pillar)# Parse command line arguments if needed{opts,_,_}=OptionParser.parse(args,strict:[env::string])env=Keyword.get(opts,:env,"dev")# Get connection details from your application configconnection_string=Application.get_env(:my_project,String.to_atom("clickhouse_#{env}_url"))conn=Pillar.Connection.new(connection_string)# Run the migrationscasePillar.Migrations.migrate(conn)do:ok->Mix.shell().info("Migrations completed successfully"){:error,reason}->Mix.shell().error("Migration failed:#{inspect(reason)}")exit({:shutdown,1})endendend
Then run the migrations with:
mix migrate_clickhouse# Or with environment specification:mix migrate_clickhouse --env=prod
Pillar automatically tracks applied migrations in a special table namedpillar_migrations
in your ClickHouse database. This table is created automatically and contains:
- The migration version (derived from the timestamp)
- The migration name
- When the migration was applied
Pillar supports timezone-aware DateTime operations when working with ClickHouse. This is particularly important when:
- Storing and retrieving DateTime values
- Performing date/time calculations across different time zones
- Ensuring consistent timestamp handling
To enable timezone support:
- Add the
tzdata
dependency to your project:
defpdepsdo[{:pillar,"~> 0.39.0"},{:tzdata,"~> 1.1"}]end
- Configure Elixir to use the Tzdata timezone database:
# In your config/config.exsconfig:elixir,:time_zone_database,Tzdata.TimeZoneDatabase
- Use timezone-aware DateTime functions in your code:
# Create a timezone-aware DateTimedatetime=DateTime.from_naive!(~N[2023-01-15 10:30:00],"Europe/Berlin")# Insert it into ClickHousePillar.insert(conn,"INSERT INTO events (id, timestamp) VALUES ({id}, {timestamp})",%{id:123,timestamp:datetime})
For more details on DateTime and timezone handling in Elixir, see theofficial documentation.
When creating a new connection withPillar.Connection.new/2
, you can specify various options (parameters):
Pillar.Connection.new("http://user:password@localhost:8123/database",# Params take precedence over URI string# Authentication optionsuser:"default",# Override username in URLpassword:"secret",# Override password in URL# Timeout optionstimeout:30_000,# Connection timeout in milliseconds# ClickHouse specific optionsdatabase:"my_database",# Override database in URLdefault_format:"JSON",# Default response format# Query execution optionsmax_execution_time:60,# Maximum query execution time in secondsmax_memory_usage:10000000,# Maximum memory usage for query in bytes)
When using a connection pool withuse Pillar
, you can configure the following options:
defmoduleMyApp.ClickHousedousePillar,# List of ClickHouse servers for load balancing and high availabilityconnection_strings:["http://user:password@clickhouse-1:8123/database","http://user:password@clickhouse-2:8123/database"],# Pool name (defaults to module name if not specified)name:__MODULE__,# Number of connections to maintain in the pool (default: 10)pool_size:15,# Maximum overflow connections (calculated as pool_size * 0.3 by default)max_overflow:5,# Time to wait when acquiring a connection from the pool in ms (default: 5000)pool_timeout:10_000,# Default query timeout in ms (default: 5000)timeout:30_000end
Pillar provides multiple HTTP adapters for communicating with ClickHouse:
- TeslaMintAdapter (default): Uses Tesla with Mint HTTP client
- HttpcAdapter: Uses Erlang's built-in
:httpc
module
If you encounter issues with the default adapter, you can switch to an alternative:
# In your config/config.exsconfig:pillar,Pillar.HttpClient,http_adapter:Pillar.HttpClient.HttpcAdapter
You can also implement your own HTTP adapter by creating a module that implements thepost/3
function and returns either a%Pillar.HttpClient.Response{}
or a%Pillar.HttpClient.TransportError{}
struct:
defmoduleMyApp.CustomHttpAdapterdo@behaviourPillar.HttpClient.Adapter@impltruedefpost(url,body,options)do# Implement your custom HTTP client logic# ...# Return a response struct%Pillar.HttpClient.Response{body:response_body,status:200,headers:[{"content-type","application/json"}]}endend# Configure Pillar to use your custom adapterconfig:pillar,Pillar.HttpClient,http_adapter:MyApp.CustomHttpAdapter
Pillar provides several strategies for handling bulk inserts:
Direct batch insert: Insert multiple records in a single query
records=[%{id:1,name:"Alice",score:85},%{id:2,name:"Bob",score:92},%{id:3,name:"Charlie",score:78}]Pillar.insert_to_table(conn,"students",records)
Buffered inserts: Use
Pillar.BulkInsertBuffer
for timed batch processing# Define a buffer module as shown in the Buffer section# Then insert records that will be buffered and flushed periodicallyStudentMetricsBuffer.insert(%{student_id:123,metric:"login",count:1})
Async inserts: Use
async_insert
for fire-and-forget operationsClickHouseMaster.async_insert_to_table("event_logs",%{event:"page_view",user_id:42,timestamp:DateTime.utc_now()})
Pillar handles type conversions between Elixir and ClickHouse automatically, but you can extend or customize this behavior:
# Convert a custom Elixir struct to a ClickHouse-compatible formatdefimplPillar.TypeConvert.ToClickHouse,for:MyApp.Userdodefconvert(user)do%{id:user.id,name:user.full_name,email:user.email,created_at:DateTime.to_iso8601(user.inserted_at)}endend
If you experience connection timeouts, consider:
Increasing the timeout values:
conn=Pillar.Connection.new(url,%{timeout:30_000})
Checking network connectivity between your application and ClickHouse
Verifying ClickHouse server is running and accepting connections:
curl http://clickhouse-server:8123/ping
For large queries that consume significant memory:
Add query settings to limit memory usage:
Pillar.query(conn,"SELECT * FROM huge_table",%{},%{max_memory_usage:10000000000,# 10GBmax_execution_time:300# 5 minutes})
Consider using streaming queries (with FORMAT CSV or TabSeparated) for very large result sets
If bulk inserts fail:
- Check your error handler in the
BulkInsertBuffer
configuration - Verify data types match the table schema
- Consider reducing batch sizes or increasing the interval between inserts
Use connection pooling: Always use a connection pool in production environments
Batch inserts: Group multiple inserts into a single operation when possible
Use async operations: For high-volume inserts where immediate confirmation isn't necessary
Query optimization: Leverage ClickHouse's strengths:
- Use proper table engines based on your query patterns
- Ensure you have appropriate indices
- Filter on columns used in the ORDER BY clause
Connection reuse: Avoid creating new connections for each query
Feel free to make a pull request. All contributions are appreciated!
About
Elixir library client for work with ClickHouse
Topics
Resources
Uh oh!
There was an error while loading.Please reload this page.
Stars
Watchers
Forks
Packages0
Uh oh!
There was an error while loading.Please reload this page.