|
107 | 107 | * href="https://clickhouse.com/docs/guides/developer/deduplication">Deduplication strategies |
108 | 108 | * documentation</a> |
109 | 109 | * |
| 110 | + * <h4>User agent</h4> |
| 111 | + * |
| 112 | + * <p>The connector automatically sets the ClickHouse client name to {@code Apache Beam/<version>}, |
| 113 | + * which is visible in {@code system.query_log.client_name}. If you set the {@code client_name} |
| 114 | + * connection property, it is appended after the Beam identifier, for example: |
| 115 | + * |
| 116 | + * <pre>{@code |
| 117 | + * Properties props = new Properties(); |
| 118 | + * props.setProperty("client_name", "MyApp/1.0"); |
| 119 | + * // Results in: "Apache Beam/<version> MyApp/1.0" |
| 120 | + * }</pre> |
| 121 | + * |
110 | 122 | * <h4>Mapping between Beam and ClickHouse types</h4> |
111 | 123 | * |
112 | 124 | * <table summary="Type mapping"> |
@@ -244,21 +256,19 @@ public abstract static class Write<T> extends PTransform<PCollection<T>, PDone> |
244 | 256 |
|
245 | 257 | @Override |
246 | 258 | public PDone expand(PCollection<T> input) { |
| 259 | + |
| 260 | + Properties properties = properties(); |
| 261 | + set(properties, "client_name", buildClientName(properties)); |
| 262 | + |
247 | 263 | TableSchema tableSchema = tableSchema(); |
248 | 264 | if (tableSchema == null) { |
249 | | - tableSchema = getTableSchema(clickHouseUrl(), database(), table(), properties()); |
| 265 | + tableSchema = getTableSchema(clickHouseUrl(), database(), table(), properties); |
250 | 266 | } |
251 | 267 |
|
252 | | - String sdkVersion = ReleaseInfo.getReleaseInfo().getSdkVersion(); |
253 | | - String userAgent = String.format("Apache Beam/%s", sdkVersion); |
254 | | - |
255 | | - Properties properties = properties(); |
256 | | - |
257 | 268 | set(properties, "max_insert_block_size", maxInsertBlockSize()); |
258 | 269 | set(properties, "insert_quorum", insertQuorum()); |
259 | 270 | set(properties, "insert_distributed_sync", insertDistributedSync()); |
260 | 271 | set(properties, "insert_deduplication", insertDeduplicate()); |
261 | | - set(properties, "product_name", userAgent); |
262 | 272 |
|
263 | 273 | WriteFn<T> fn = |
264 | 274 | new AutoValue_ClickHouseIO_WriteFn.Builder<T>() |
@@ -526,8 +536,7 @@ public void setup() throws Exception { |
526 | 536 | .setPassword(password) |
527 | 537 | .setDefaultDatabase(database()) |
528 | 538 | .setOptions(options) |
529 | | - .setClientName( |
530 | | - String.format("Apache Beam/%s", ReleaseInfo.getReleaseInfo().getSdkVersion())); |
| 539 | + .setClientName(properties().getProperty("client_name")); |
531 | 540 |
|
532 | 541 | // Add optional compression if specified in properties |
533 | 542 | String compress = properties().getProperty("compress", "false"); |
@@ -725,8 +734,7 @@ public static TableSchema getTableSchema( |
725 | 734 | .setUsername(user) |
726 | 735 | .setPassword(password) |
727 | 736 | .setDefaultDatabase(database) |
728 | | - .setClientName( |
729 | | - String.format("Apache Beam/%s", ReleaseInfo.getReleaseInfo().getSdkVersion())); |
| 737 | + .setClientName(buildClientName(properties)); |
730 | 738 |
|
731 | 739 | try (Client client = clientBuilder.build()) { |
732 | 740 | String query = "DESCRIBE TABLE " + quoteIdentifier(table); |
@@ -766,6 +774,17 @@ public static TableSchema getTableSchema( |
766 | 774 | } |
767 | 775 | } |
768 | 776 |
|
| 777 | + @VisibleForTesting |
| 778 | + static String buildClientName(Properties properties) { |
| 779 | + String beamAgent = |
| 780 | + String.format("Apache Beam/%s", ReleaseInfo.getReleaseInfo().getSdkVersion()); |
| 781 | + String existingClientName = properties.getProperty("client_name"); |
| 782 | + if (!Strings.isNullOrEmpty(existingClientName)) { |
| 783 | + return beamAgent + " " + existingClientName; |
| 784 | + } |
| 785 | + return beamAgent; |
| 786 | + } |
| 787 | + |
769 | 788 | static String quoteIdentifier(String identifier) { |
770 | 789 | String backslash = "\\\\"; |
771 | 790 | String quote = "\""; |
|
0 commit comments