- Notifications
You must be signed in to change notification settings - Fork0
coded-streams/codedstreams-crypto-risk-contract-library
Folders and files
| Name | Name | Last commit message | Last commit date | |
|---|---|---|---|---|
Repository files navigation
A shared Java library providing Avro-based data contracts for the CodedStreams Crypto Trading Platform. This library ensures type safety and schema consistency across all microservices in the risk control ecosystem.
The CodedStreams Crypto Risk Contract Library serves as the single source of truth for all data schemas used in the crypto trading risk control system. It provides:
- Avro Schema Definitions for trade events and risk alerts
- Auto-generated Java Classes for type-safe data handling
- Serialization Utilities for efficient Avro binary encoding
- Schema Evolution support for backward/forward compatibility
graph TB A[Avro Schema Files] --> B[Avro Maven Plugin] B --> C[Auto-generated Java Classes] C --> D[Serialization Utilities] E[Trade Producer] --> F[Uses Data Contracts] G[Risk Engine] --> F H[Alert Consumer] --> F subgraph "Shared Library" A B C D end subgraph "Microservices" E G H end F[Data Contracts JAR]Represents cryptocurrency trading events with comprehensive risk metadata.
Key Fields:
tradeId: Unique trade identifieruserId: User identifier for risk trackingsymbol: Trading pair (e.g., "BTC/USDT")side: BUY or SELLquantity: Trade quantity in base assetprice: Trade price in quote assetnotional: Calculated value (quantity × price)timestamp: Event timestamp in millisecondsriskLevel: Initial risk assessment (LOW, MEDIUM, HIGH, CRITICAL)
Represents risk alerts generated by the Flink CEP engine.
Key Fields:
alertId: Unique alert identifierseverity: Alert severity level (LOW, MEDIUM, HIGH, CRITICAL)patternType: Risk pattern that triggered the alertuserId: User who triggered the alertdescription: Human-readable alert descriptionrecommendedAction: Suggested mitigation stepstriggeredEvents: Map of events that triggered the pattern
Common metadata for platform-wide event tracing and versioning.
- Java 11 or higher
- Maven 3.6+
- Access to company Maven repository (or local installation)
Clone the repository:
git clone https://github.com/codedstreams/crypto-risk-contract-library.gitcd crypto-risk-contract-libraryBuild and install locally:
mvn clean install
Add dependency to your project:
<dependency> <groupId>com.codedstreams</groupId> <artifactId>crypto-risk-contract-library</artifactId> <version>1.0.2</version></dependency>
importcom.codedstreams.contracts.avro.CryptoTrade;importcom.codedstreams.contracts.avro.RiskAlert;importcom.codedstreams.contracts.AvroSerialization;// Create a trade eventCryptoTradetrade =CryptoTrade.newBuilder() .setTradeId("trade-12345") .setUserId("user-001") .setCustomerId("cust-1001") .setSymbol("BTC/USDT") .setBaseAsset("BTC") .setQuoteAsset("USDT") .setSide(CryptoTrade.TradeSide.BUY) .setOrderType(CryptoTrade.OrderType.LIMIT) .setQuantity(0.5) .setPrice(45000.0) .setNotional(22500.0) .setTimestamp(System.currentTimeMillis()) .setExchange("BINANCE") .setRiskLevel(CryptoTrade.RiskLevel.LOW) .build();// Serialize to byte arraybyte[]serializedTrade =AvroSerialization.serializeTrade(trade);// Deserialize from byte arrayCryptoTradedeserializedTrade =AvroSerialization.deserializeTrade(serializedTrade);
@ComponentpublicclassTradeProducerService {@AutowiredprivateKafkaTemplate<String,byte[]>kafkaTemplate;publicvoidproduceTrade(CryptoTradetrade) {try {byte[]serializedData =AvroSerialization.serializeTrade(trade);kafkaTemplate.send("crypto-trades",trade.getUserId().toString(),serializedData); }catch (IOExceptione) {thrownewRuntimeException("Failed to serialize trade",e); } }}
@ComponentpublicclassTradeConsumerService {@KafkaListener(topics ="crypto-trades")publicvoidconsumeTrade(byte[]message) {try {CryptoTradetrade =AvroSerialization.deserializeTrade(message);// Process the trade...processRiskAssessment(trade); }catch (IOExceptione) {log.error("Failed to deserialize trade",e); } }}
- Create new
.avscfile insrc/main/resources/avro/ - Run
mvn compileto generate Java classes - Add serialization methods in
AvroSerialization.java
- Add New Fields: Always provide default values
- Remove Fields: Mark as deprecated first, remove in next major version
- Change Types: Use union types for compatibility
- Versioning: Increment version in schema documentation
{"name":"newField","type": ["null","string"],"doc":"New field for enhanced risk analysis","default":null}For production environments, integrate with Confluent Schema Registry:
// Example configuration for schema registry@ConfigurationpublicclassSchemaRegistryConfig {@BeanpublicSchemaRegistryClientschemaRegistryClient() {returnnewCachedSchemaRegistryClient("http://schema-registry:8081",100 ); }}
publicclassAvroSerializationTest {@TestpublicvoidtestTradeSerializationRoundTrip()throwsIOException {// GivenCryptoTradeoriginalTrade =createTestTrade();// Whenbyte[]serialized =AvroSerialization.serializeTrade(originalTrade);CryptoTradedeserialized =AvroSerialization.deserializeTrade(serialized);// ThenassertEquals(originalTrade.getTradeId(),deserialized.getTradeId());assertEquals(originalTrade.getNotional(),deserialized.getNotional()); }privateCryptoTradecreateTestTrade() {returnCryptoTrade.newBuilder() .setTradeId("test-trade-001") .setUserId("test-user-001") .setCustomerId("test-cust-001") .setSymbol("BTC/USDT") .setBaseAsset("BTC") .setQuoteAsset("USDT") .setSide(CryptoTrade.TradeSide.BUY) .setOrderType(CryptoTrade.OrderType.LIMIT) .setQuantity(1.0) .setPrice(50000.0) .setNotional(50000.0) .setTimestamp(System.currentTimeMillis()) .setExchange("TEST_EXCHANGE") .setRiskLevel(CryptoTrade.RiskLevel.LOW) .build(); }}
@SpringBootTestpublicclassSchemaCompatibilityTest {@TestpublicvoidtestBackwardCompatibility() {// Test that new consumers can read old messages// and old consumers can read new messages }}
- Data Validation: All schemas include validation rules
- Sensitive Data: Wallet addresses and IPs are optional fields
- Audit Trail: All events include timestamps and correlation IDs
- Schema Signing: Consider digital signatures for production schemas
@ComponentpublicclassSchemaHealthIndicatorimplementsHealthIndicator {@OverridepublicHealthhealth() {try {// Validate all schemas can be compiledvalidateSchemas();returnHealth.up() .withDetail("schemas","healthy") .withDetail("version","1.0.0") .build(); }catch (Exceptione) {returnHealth.down(e).build(); } }}
@Slf4jpublicclassAvroSerialization {privatestaticfinalLoggerlog =LoggerFactory.getLogger(AvroSerialization.class);publicstaticbyte[]serializeTrade(CryptoTradetrade)throwsIOException {log.debug("Serializing trade: {}",trade.getTradeId());// ... serialization logic }}
The library includes GitHub Actions workflow for automated testing and deployment:
name:Build and Deploy Data Contractson:push:branches:[ main ]pull_request:branches:[ main ]jobs:build:runs-on:ubuntu-lateststeps: -uses:actions/checkout@v3 -name:Set up JDK 11uses:actions/setup-java@v3with:java-version:'11'distribution:'temurin'cache:'maven' -name:Build and Testrun:mvn clean verify -name:Deploy to Artifactoryif:github.ref == 'refs/heads/main'run:mvn deployenv:ARTIFACTORY_USER:${{ secrets.ARTIFACTORY_USER }}ARTIFACTORY_PASSWORD:${{ secrets.ARTIFACTORY_PASSWORD }}
- Major: Breaking schema changes
- Minor: Backward-compatible new features
- Patch: Bug fixes and documentation
- Fork the repository
- Create a feature branch:
git checkout -b feature/new-schema - Add/update Avro schemas in
src/main/resources/avro/ - Run
mvn compileto generate Java classes - Add tests for new functionality
- Submit a pull request
- All schemas must include comprehensive documentation
- New fields require default values for backward compatibility
- All generated code should be committed to repository
- Update README.md for new features
For issues and questions:
- Check existingGitHub Issues
- Create a new issue with schema examples and error messages
- Contact the Data Contracts team atdatacontracts@codedstreams.com
This project is licensed under Codedstreams as a personal portfolio project.
Maintained by: Nestor Martourez @ CodedStreams Data Streaming EngineeringLast Updated: October 2025
Compatible With: Trade Producer Simulator v1.0+, Risk Engine v1.0+
About
Crypto Risk Contract Library serves as the single source of truth for all data schemas used in the crypto trading risk control system
Topics
Resources
Uh oh!
There was an error while loading.Please reload this page.
Stars
Watchers
Forks
Releases
Packages0
Uh oh!
There was an error while loading.Please reload this page.
Contributors2
Uh oh!
There was an error while loading.Please reload this page.