Movatterモバイル変換


[0]ホーム

URL:


Skip to content

Navigation Menu

Sign in
Appearance settings

Search code, repositories, users, issues, pull requests...

Provide feedback

We read every piece of feedback, and take your input very seriously.

Saved searches

Use saved searches to filter your results more quickly

Sign up
Appearance settings

Elixir library client for work with ClickHouse

NotificationsYou must be signed in to change notification settings

balance-platform/pillar

Repository files navigation

github.comhex.pmhex.pmhex.pmhex.pmgithub.com

Elixir client forClickHouse, a fast open-sourceOnline Analytical Processing (OLAP) database management system.

Table of Contents

Getting Started

Installation

Addpillar to your list of dependencies inmix.exs:

defdepsdo[{:pillar,"~> 0.39.0"}]end

Basic Usage

Here's a simple example to get started with Pillar:

# Create a direct connectionconn=Pillar.Connection.new("http://user:password@localhost:8123/database")# Execute a SELECT query{:ok,result}=Pillar.select(conn,"SELECT * FROM users LIMIT 10")# Execute a parameterized query{:ok,result}=Pillar.query(conn,"SELECT * FROM users WHERE created_at > {min_date} LIMIT {limit}",%{min_date:"2023-01-01",limit:100})# Insert data{:ok,_}=Pillar.insert(conn,"INSERT INTO events (user_id, event_type, created_at) VALUES ({user_id}, {event_type}, {created_at})",%{user_id:123,event_type:"login",created_at:DateTime.utc_now()})

Features

Pillar offers a comprehensive set of features for working with ClickHouse:

Direct Usage with connection structure

The most straightforward way to use Pillar is by creating a direct connection to your ClickHouse server:

# Create a connection to your ClickHouse serverconn=Pillar.Connection.new("http://user:password@localhost:8123/database")# Parameters are passed in curly braces {} in the SQL query# The map structure provides values for these parameterssql="SELECT count(*) FROM users WHERE lastname = {lastname}"params=%{lastname:"Smith"}{:ok,result}=Pillar.query(conn,sql,params)result#=> [%{"count(*)" => 347}]

You can also specify additional connection options:

conn=Pillar.Connection.new("http://user:password@localhost:8123/database",%{timeout:30_000,# Connection timeout in millisecondspool_timeout:5_000,# Pool timeout in millisecondsrequest_timeout:60_000# Request timeout in milliseconds})

Pool of workers

For production environments, using a connection pool is highly recommended. This approach provides:

  • Efficient connection management
  • Supervised workers
  • Load balancing across multiple ClickHouse servers
  • Better performance under high load
defmoduleClickhouseMasterdousePillar,connection_strings:["http://user:password@host-master-1:8123/database","http://user:password@host-master-2:8123/database"],name:__MODULE__,pool_size:15,pool_timeout:10_000,# Time to wait for a connection from the pooltimeout:30_000# Default query timeoutend# Start the connection pool as part of your supervision treeClickhouseMaster.start_link()# Execute queries using the pool{:ok,result}=ClickhouseMaster.select("SELECT * FROM users WHERE age > {min_age}",%{min_age:21}){:ok,_}=ClickhouseMaster.insert("INSERT INTO logs (message) VALUES ({message})",%{message:"User logged in"})

The pool automatically manages connection acquisition and release, and can handle multiple ClickHouse servers for load balancing and high availability.

Async insert

Asynchronous inserts are useful for non-blocking operations when you don't need to wait for a response. This is particularly valuable for:

  • Logging events
  • Metrics collection
  • Any high-volume insert operations where immediate confirmation isn't required
# Using a connection pool (recommended approach)ClickhouseMaster.async_insert("INSERT INTO events (user_id, event, timestamp) VALUES ({user_id}, {event}, {timestamp})",%{user_id:user.id,event:"password_changed",timestamp:DateTime.utc_now()})# => :ok# The request is sent and the function returns immediately without waiting for a response

Note: Async inserts are only available when using a connection pool created withuse Pillar. If you attempt to usePillar.async_insert/4 directly with a connection structure, it will raise an error.

Buffer for periodical bulk inserts

The bulk insert buffer feature allows you to collect records in memory and insert them in batches at specified intervals. This is highly efficient for:

  • High-frequency event logging
  • Metrics collection
  • Any scenario where you need to insert many small records

This feature requires aPool of workers to be set up first.

defmoduleBulkToLogsdousePillar.BulkInsertBuffer,# Reference to your Pillar connection poolpool:ClickhouseMaster,# Target table for insertstable_name:"logs",# How often to flush buffered records (seconds)# Default is 5 seconds if not specifiedinterval_between_inserts_in_seconds:5,# Optional error handler functionon_errors:&__MODULE__.dump_to_file/2,# Maximum records to buffer before forcing a flush# Optional, defaults to 10000max_buffer_size:5000@doc"""  Error handler that stores failed inserts into a file  Parameters:  - result: The error result from ClickHouse  - records: The batch of records that failed to insert  """defdump_to_file(_result,records)dotimestamp=DateTime.utc_now()|>DateTime.to_string()|>String.replace(":","-")directory="bad_inserts"# Ensure the directory existsFile.mkdir_p!(directory)# Write failed records to a fileFile.write("#{directory}/#{timestamp}.log",inspect(records,pretty:true))end@doc"""  Alternative error handler that attempts to retry failed inserts  Note: Retrying can be risky in case of persistent errors  """defretry_insert(_result,records)do# Add a short delay before retryingProcess.sleep(1000)__MODULE__.insert(records)endend

Usage example:

# Records are buffered in memory until the flush interval:ok=BulkToLogs.insert(%{value:"online",count:133,datetime:DateTime.utc_now()}):ok=BulkToLogs.insert(%{value:"online",count:134,datetime:DateTime.utc_now()}):ok=BulkToLogs.insert(%{value:"offline",count:42,datetime:DateTime.utc_now()})# All these records will be inserted in a single batch after the configured interval (5 seconds by default)

Theon_errors parameter is a callback function that will be invoked if an error occurs during bulk insert. This is useful for:

  • Logging failed inserts
  • Writing failed records to a backup location
  • Implementing custom retry logic

The callback receives two parameters:

  1. The error result from ClickHouse
  2. The batch of records that failed to insert

Migrations

Pillar provides a migrations system to help you manage your ClickHouse database schema changes in a version-controlled manner. This feature is particularly useful for:

  • Creating tables
  • Modifying schema
  • Ensuring consistent database setup across environments
  • Tracking schema changes over time

Generating Migrations

Migrations can be generated with the mix task:

mix pillar.gen.migration create_events_table

This creates a new migration file inpriv/pillar_migrations with a timestamp prefix, for example:priv/pillar_migrations/20250528120000_create_events_table.exs

Basic Migration Structure

defmodulePillar.Migrations.CreateEventsTabledodefupdo"""    CREATE TABLE IF NOT EXISTS events (      id UUID,      user_id UInt64,      event_type String,      payload String,      created_at DateTime    ) ENGINE = MergeTree()    ORDER BY (created_at, id)    """end# Optional: Implement a down function for rollbacksdefdowndo"DROP TABLE IF EXISTS events"endend

Multi-Statement Migrations

For complex scenarios where you need to execute multiple statements in a single migration, return a list of strings:

defmodulePillar.Migrations.CreateMultipleTablesdodefupdo# For multi-statement migrations, return a list of strings["""      CREATE TABLE IF NOT EXISTS events (        id UUID,        user_id UInt64,        event_type String,        created_at DateTime      ) ENGINE = MergeTree()      ORDER BY (created_at, id)      ""","""      CREATE TABLE IF NOT EXISTS event_metrics (        date Date,        event_type String,        count UInt64      ) ENGINE = SummingMergeTree(count)      ORDER BY (date, event_type)      """]endend

You can also dynamically generate migrations:

defmodulePillar.Migrations.CreateShardedTablesdodefupdo# Generate 5 sharded tables(0..4)|>Enum.map(fni->"""      CREATE TABLE IF NOT EXISTS events_shard_#{i} (        id UUID,        user_id UInt64,        event_type String,        created_at DateTime      ) ENGINE = MergeTree()      ORDER BY (created_at, id)      """end)endend

Running Migrations

To run migrations, create a mix task:

defmoduleMix.Tasks.MigrateClickhousedouseMix.Task@shortdoc"Runs ClickHouse migrations"defrun(args)do# Start any necessary applicationsApplication.ensure_all_started(:pillar)# Parse command line arguments if needed{opts,_,_}=OptionParser.parse(args,strict:[env::string])env=Keyword.get(opts,:env,"dev")# Get connection details from your application configconnection_string=Application.get_env(:my_project,String.to_atom("clickhouse_#{env}_url"))conn=Pillar.Connection.new(connection_string)# Run the migrationscasePillar.Migrations.migrate(conn)do:ok->Mix.shell().info("Migrations completed successfully"){:error,reason}->Mix.shell().error("Migration failed:#{inspect(reason)}")exit({:shutdown,1})endendend

Then run the migrations with:

mix migrate_clickhouse# Or with environment specification:mix migrate_clickhouse --env=prod

Migration Tracking

Pillar automatically tracks applied migrations in a special table namedpillar_migrations in your ClickHouse database. This table is created automatically and contains:

  • The migration version (derived from the timestamp)
  • The migration name
  • When the migration was applied

Timezones

Pillar supports timezone-aware DateTime operations when working with ClickHouse. This is particularly important when:

  • Storing and retrieving DateTime values
  • Performing date/time calculations across different time zones
  • Ensuring consistent timestamp handling

To enable timezone support:

  1. Add thetzdata dependency to your project:
defpdepsdo[{:pillar,"~> 0.39.0"},{:tzdata,"~> 1.1"}]end
  1. Configure Elixir to use the Tzdata timezone database:
# In your config/config.exsconfig:elixir,:time_zone_database,Tzdata.TimeZoneDatabase
  1. Use timezone-aware DateTime functions in your code:
# Create a timezone-aware DateTimedatetime=DateTime.from_naive!(~N[2023-01-15 10:30:00],"Europe/Berlin")# Insert it into ClickHousePillar.insert(conn,"INSERT INTO events (id, timestamp) VALUES ({id}, {timestamp})",%{id:123,timestamp:datetime})

For more details on DateTime and timezone handling in Elixir, see theofficial documentation.

Configuration

Connection Options

When creating a new connection withPillar.Connection.new/2, you can specify various options (parameters):

Pillar.Connection.new("http://user:password@localhost:8123/database",# Params take precedence over URI string# Authentication optionsuser:"default",# Override username in URLpassword:"secret",# Override password in URL# Timeout optionstimeout:30_000,# Connection timeout in milliseconds# ClickHouse specific optionsdatabase:"my_database",# Override database in URLdefault_format:"JSON",# Default response format# Query execution optionsmax_execution_time:60,# Maximum query execution time in secondsmax_memory_usage:10000000,# Maximum memory usage for query in bytes)

Pool Configuration

When using a connection pool withuse Pillar, you can configure the following options:

defmoduleMyApp.ClickHousedousePillar,# List of ClickHouse servers for load balancing and high availabilityconnection_strings:["http://user:password@clickhouse-1:8123/database","http://user:password@clickhouse-2:8123/database"],# Pool name (defaults to module name if not specified)name:__MODULE__,# Number of connections to maintain in the pool (default: 10)pool_size:15,# Maximum overflow connections (calculated as pool_size * 0.3 by default)max_overflow:5,# Time to wait when acquiring a connection from the pool in ms (default: 5000)pool_timeout:10_000,# Default query timeout in ms (default: 5000)timeout:30_000end

HTTP Adapters

Pillar provides multiple HTTP adapters for communicating with ClickHouse:

  1. TeslaMintAdapter (default): Uses Tesla with Mint HTTP client
  2. HttpcAdapter: Uses Erlang's built-in:httpc module

If you encounter issues with the default adapter, you can switch to an alternative:

# In your config/config.exsconfig:pillar,Pillar.HttpClient,http_adapter:Pillar.HttpClient.HttpcAdapter

You can also implement your own HTTP adapter by creating a module that implements thepost/3 function and returns either a%Pillar.HttpClient.Response{} or a%Pillar.HttpClient.TransportError{} struct:

defmoduleMyApp.CustomHttpAdapterdo@behaviourPillar.HttpClient.Adapter@impltruedefpost(url,body,options)do# Implement your custom HTTP client logic# ...# Return a response struct%Pillar.HttpClient.Response{body:response_body,status:200,headers:[{"content-type","application/json"}]}endend# Configure Pillar to use your custom adapterconfig:pillar,Pillar.HttpClient,http_adapter:MyApp.CustomHttpAdapter

Advanced Usage

Bulk Insert Strategies

Pillar provides several strategies for handling bulk inserts:

  1. Direct batch insert: Insert multiple records in a single query

    records=[%{id:1,name:"Alice",score:85},%{id:2,name:"Bob",score:92},%{id:3,name:"Charlie",score:78}]Pillar.insert_to_table(conn,"students",records)
  2. Buffered inserts: UsePillar.BulkInsertBuffer for timed batch processing

    # Define a buffer module as shown in the Buffer section# Then insert records that will be buffered and flushed periodicallyStudentMetricsBuffer.insert(%{student_id:123,metric:"login",count:1})
  3. Async inserts: Useasync_insert for fire-and-forget operations

    ClickHouseMaster.async_insert_to_table("event_logs",%{event:"page_view",user_id:42,timestamp:DateTime.utc_now()})

Custom Type Conversions

Pillar handles type conversions between Elixir and ClickHouse automatically, but you can extend or customize this behavior:

# Convert a custom Elixir struct to a ClickHouse-compatible formatdefimplPillar.TypeConvert.ToClickHouse,for:MyApp.Userdodefconvert(user)do%{id:user.id,name:user.full_name,email:user.email,created_at:DateTime.to_iso8601(user.inserted_at)}endend

Troubleshooting

Common Issues

Connection Timeouts

If you experience connection timeouts, consider:

  1. Increasing the timeout values:

    conn=Pillar.Connection.new(url,%{timeout:30_000})
  2. Checking network connectivity between your application and ClickHouse

  3. Verifying ClickHouse server is running and accepting connections:

    curl http://clickhouse-server:8123/ping

Memory Limitations

For large queries that consume significant memory:

  1. Add query settings to limit memory usage:

    Pillar.query(conn,"SELECT * FROM huge_table",%{},%{max_memory_usage:10000000000,# 10GBmax_execution_time:300# 5 minutes})
  2. Consider using streaming queries (with FORMAT CSV or TabSeparated) for very large result sets

Bulk Insert Failures

If bulk inserts fail:

  1. Check your error handler in theBulkInsertBuffer configuration
  2. Verify data types match the table schema
  3. Consider reducing batch sizes or increasing the interval between inserts

Performance Optimization

  1. Use connection pooling: Always use a connection pool in production environments

  2. Batch inserts: Group multiple inserts into a single operation when possible

  3. Use async operations: For high-volume inserts where immediate confirmation isn't necessary

  4. Query optimization: Leverage ClickHouse's strengths:

    • Use proper table engines based on your query patterns
    • Ensure you have appropriate indices
    • Filter on columns used in the ORDER BY clause
  5. Connection reuse: Avoid creating new connections for each query

Contribution

Feel free to make a pull request. All contributions are appreciated!

About

Elixir library client for work with ClickHouse

Topics

Resources

Stars

Watchers

Forks

Packages

No packages published

Contributors17

Languages


[8]ページ先頭

©2009-2025 Movatter.jp