可以做兼职的网站推荐,重庆装修公司最新排名,图怪兽作图神器下载,重庆网站设计中心seatunnel-一种场景mysqlcdc同步进入clickhouse基于2.3.11版本MySQL CDC 到 ClickHouse 完整数据流转分析概述本文档详细分析在 startup_modeinitial 模式下#xff0c;数据从 MySQL CDC Source 读取到 ClickHouse Sink 的完整流转过程。我们将深入探讨每个环节的具体函数调用…seatunnel-一种场景mysqlcdc同步进入clickhouse基于2.3.11版本MySQL CDC 到 ClickHouse 完整数据流转分析概述本文档详细分析在startup_mode#34;initial#34;模式下数据从 MySQL CDC Source 读取到 ClickHouse Sink 的完整流转过程。我们将深入探讨每个环节的具体函数调用链、数据转换过程以及关键实现细节。整体流程概览sql体验AI代码助手代码解读复制代码MySQL Binlog/Snapshot → CDC Source Reader → SeaTunnel Row → ClickHouse Sink Writer → ClickHouse详细函数调用链分析第一阶段任务启动与初始化1.1 SeaTunnel 任务启动scss体验AI代码助手代码解读复制代码SeaTunnel.run(command) → command.execute() → TaskExecution.execute()入口函数:org.apache.seatunnel.core.starter.SeaTunnel.run()• 接收命令行参数启动 SeaTunnel 任务• 调用具体的执行命令1.2 Source 初始化scss体验AI代码助手代码解读复制代码MySqlIncrementalSource.createReader() → IncrementalSourceReader() → MySqlDialect.createFetchTaskContext()关键函数:MySqlIncrementalSource.createReader()java体验AI代码助手代码解读复制代码Override public SourceReader createReader(SourceReader.Context context) throws Exception { // 创建增量源读取器 return new IncrementalSourceReader( dataSourceDialect, elementsQueue, splitReaderSupplier, recordEmitter, options, context, configFactory.create(0), deserializationSchema ); }1.3 Sink 初始化scss体验AI代码助手代码解读复制代码ClickhouseSink.createWriter() → ClickhouseSinkWriter() → initStatementMap()关键函数:ClickhouseSink.createWriter()java体验AI代码助手代码解读复制代码Override public ClickhouseSinkWriter createWriter(SinkWriter.Context context) throws IOException { // 1. 创建 ClickHouse 节点连接 List nodes ClickhouseUtil.createNodes(readonlyConfig); // 2. 创建代理连接 ClickhouseProxy proxy new ClickhouseProxy(nodes.get(0)); // 3. 获取表结构信息 Map tableSchema proxy.getClickhouseTableSchema(readonlyConfig.get(TABLE)); // 4. 创建分片路由器 ShardRouter shardRouter new ShardRouter(proxy, option.getShardMetadata()); // 5. 创建批次执行器映射 Map statementMap initStatementMap(); return new ClickhouseSinkWriter(option, context); }第二阶段数据分片与分配Source 端2.1 分片分配器初始化体验AI代码助手代码解读复制代码IncrementalSourceEnumerator → HybridSplitAssigner → MySqlChunkSplitter关键函数:HybridSplitAssigner.getNext()kotlin体验AI代码助手代码解读复制代码Override public Optional getNext() { // 1. 首先分配快照分片 if (!snapshotAssigner.isFinished()) { return snapshotAssigner.getNext(); } // 2. 快照完成后分配增量分片 if (incrementalSplitAssigner ! null) { return incrementalSplitAssigner.getNext(); } return Optional.empty(); }2.2 分片算法实现scss体验AI代码助手代码解读复制代码MySqlChunkSplitter.queryMinMax() → MySqlUtils.queryMinMax() → buildSplitScanQuery()关键函数:MySqlUtils.queryMinMax()scss体验AI代码助手代码解读复制代码public static Object[] queryMinMax(JdbcConnection jdbc, TableId tableId, String columnName) throws SQLException { final String minMaxQuery String.format( #34;SELECT MIN(%s), MAX(%s) FROM %s#34;, quote(columnName), quote(columnName), quote(tableId) ); return jdbc.queryAndMap(minMaxQuery, rs - { if (!rs.next()) { throw new SQLException(String.format(#34;No result returned after running query [%s]#34;, minMaxQuery)); } return rowToArray(rs, 2); }); }第三阶段全量数据读取Source 端3.1 快照任务执行scss体验AI代码助手代码解读复制代码MySqlSnapshotFetchTask.execute() → MySqlSnapshotSplitReadTask.execute() → doExecute()关键函数:MySqlSnapshotSplitReadTask.doExecute()ini体验AI代码助手代码解读复制代码Override protected SnapshotResult doExecute( ChangeEventSourceContext context, MySqlOffsetContext previousOffset, SnapshotContext snapshotContext, SnapshottingTask snapshottingTask) throws Exception { final MySqlSnapshotContext ctx (MySqlSnapshotContext) snapshotContext; ctx.offset offsetContext; // 步骤1确定低水位标记开始时的 binlog 位置 final BinlogOffset lowWatermark currentBinlogOffset(jdbcConnection); LOG.info(#34;Snapshot step 1 - Determining low watermark {} for split {}#34;, lowWatermark, snapshotSplit); ((SnapshotSplitChangeEventSourceContext) context).setLowWatermark(lowWatermark); dispatcher.dispatchWatermarkEvent( ctx.partition.getSourcePartition(), snapshotSplit, lowWatermark, WatermarkKind.LOW); // 步骤2快照数据 LOG.info(#34;Snapshot step 2 - Snapshotting data#34;); createDataEvents(ctx, snapshotSplit.getTableId()); // 步骤3确定高水位标记结束时的 binlog 位置 final BinlogOffset highWatermark currentBinlogOffset(jdbcConnection); LOG.info(#34;Snapshot step 3 - Determining high watermark {} for split {}#34;, highWatermark, snapshotSplit); ((SnapshotSplitChangeEventSourceContext) context).setHighWatermark(highWatermark); dispatcher.dispatchWatermarkEvent( ctx.partition.getSourcePartition(), snapshotSplit, highWatermark, WatermarkKind.HIGH); return SnapshotResult.completed(ctx.offset); }3.2 数据事件创建scss体验AI代码助手代码解读复制代码createDataEvents() → createDataEventsForTable() → createDataEventsForTable()关键函数:MySqlSnapshotSplitReadTask.createDataEventsForTable()ini体验AI代码助手代码解读复制代码private void createDataEventsForTable( MySqlSnapshotContext snapshotContext, EventDispatcher.SnapshotReceiver snapshotReceiver, Table table) throws InterruptedException { long exportStart clock.currentTimeInMillis(); LOG.info(#34;Exporting data from split {} of table {}#34;, snapshotSplit.splitId(), table.id()); // 构建分片查询 SQL final String selectSql buildSplitScanQuery( snapshotSplit.getTableId(), snapshotSplit.getSplitKeyType(), snapshotSplit.getSplitStart() null, snapshotSplit.getSplitEnd() null ); // 执行查询并创建数据事件 try (PreparedStatement statement readTableSplitDataStatement(selectSql, snapshotSplit); ResultSet resultSet statement.executeQuery()) { ColumnUtils.ColumnArray columnArray ColumnUtils.toArray(table, resultSet.getMetaData()); long rows 0; Timer logTimer getTableScanLogTimer(); while (resultSet.next()) { rows; final Object[] row new Object[columnArray.getGreatestColumnPosition()]; // 从 ResultSet 中提取数据 for (int i 0; i columnArray.getColumns().length; i) { Column column columnArray.getColumns()[i]; int position columnArray.getColumnPositions()[i]; row[position - 1] readColumnValue(resultSet, i 1, column); } // 创建快照变更记录 SnapshotChangeRecordEmitter recordEmitter new SnapshotChangeRecordEmitter( partition, offsetContext, table.id(), row, clock.currentTimeAsInstant() ); // 分发数据变更事件 dispatcher.dispatchSnapshotEvent(snapshotContext.partition, table.id(), recordEmitter); // 定期记录进度 if (logTimer.expired()) { LOG.info(#34;Exported {} records for table {} from split {}#34;, rows, table.id(), snapshotSplit.splitId()); logTimer getTableScanLogTimer(); } } LOG.info(#34;Export {} records for table {} from split {}#34;, rows, table.id(), snapshotSplit.splitId()); } }第四阶段数据反序列化转换4.1 Debezium 事件转换scss体验AI代码助手代码解读复制代码SeaTunnelRowDebeziumDeserializeSchema.deserialize() → extractAfterRow()/extractBeforeRow()关键函数:SeaTunnelRowDebeziumDeserializeSchema.deserializeDataChangeRecord()ini体验AI代码助手代码解读复制代码private void deserializeDataChangeRecord(SourceRecord record, Collector collector) throws Exception { Envelope.Operation operation Envelope.operationFor(record); Struct messageStruct (Struct) record.value(); Schema valueSchema record.valueSchema(); TablePath tablePath SourceRecordUtils.getTablePath(record); String tableId tablePath.toString(); // 获取对应表的转换器 SeaTunnelRowDebeziumDeserializationConverters converters tableRowConverters.get(tableId); Long fetchTimestamp SourceRecordUtils.getFetchTimestamp(record); Long messageTimestamp SourceRecordUtils.getMessageTimestamp(record); long delay -1L; if (fetchTimestamp ! null messageTimestamp ! null) { delay fetchTimestamp - messageTimestamp; } if (operation Envelope.Operation.CREATE || operation Envelope.Operation.READ) { // INSERT 操作全量快照中的数据也标记为 INSERT SeaTunnelRow insert extractAfterRow(converters, record, messageStruct, valueSchema); insert.setRowKind(RowKind.INSERT); insert.setTableId(tableId); MetadataUtil.setDelay(insert, delay); MetadataUtil.setEventTime(insert, fetchTimestamp); collector.collect(insert); } else if (operation Envelope.Operation.DELETE) { // DELETE 操作 SeaTunnelRow delete extractBeforeRow(converters, record, messageStruct, valueSchema); delete.setRowKind(RowKind.DELETE); delete.setTableId(tableId); MetadataUtil.setDelay(delete, delay); MetadataUtil.setEventTime(delete, fetchTimestamp); collector.collect(delete); } else if (operation Envelope.Operation.UPDATE) { // UPDATE 操作生成两行UPDATE_BEFORE 和 UPDATE_AFTER SeaTunnelRow before extractBeforeRow(converters, record, messageStruct, valueSchema); before.setRowKind(RowKind.UPDATE_BEFORE); before.setTableId(tableId); MetadataUtil.setDelay(before, delay); MetadataUtil.setEventTime(before, fetchTimestamp); collector.collect(before); SeaTunnelRow after extractAfterRow(converters, record, messageStruct, valueSchema); after.setRowKind(RowKind.UPDATE_AFTER); after.setTableId(tableId); MetadataUtil.setDelay(after, delay); MetadataUtil.setEventTime(after, fetchTimestamp); collector.collect(after); } }4.2 数据类型转换scss体验AI代码助手代码解读复制代码extractAfterRow() → converters.getAfterRowConverter().convert() → RowConverter关键函数:SeaTunnelRowDebeziumDeserializationConverters.extractAfterRow()ini体验AI代码助手代码解读复制代码public SeaTunnelRow extractAfterRow(SourceRecord record, Struct value, Schema valueSchema) throws Exception { Struct after value.getStruct(Envelope.FieldName.AFTER); Schema afterSchema valueSchema.field(Envelope.FieldName.AFTER).schema(); SeaTunnelRow row new SeaTunnelRow(fieldConverters.length); for (int i 0; i fieldConverters.length; i) { String fieldName fieldNames[i]; Object fieldValue after.get(fieldName); Object convertedValue fieldConverters[i].convert(fieldValue); row.setField(i, convertedValue); } // 设置元数据字段 for (int i 0; i metadataConverters.length; i) { row.setField(fieldConverters.length i, metadataConverters[i].convert(record)); } return row; }第五阶段数据写入 ClickHouse5.1 Sink Writer 接收数据scss体验AI代码助手代码解读复制代码ClickhouseSinkWriter.write() → shardRouter.getShard() → addIntoBatch()关键函数:ClickhouseSinkWriter.write()scss体验AI代码助手代码解读复制代码Override public void write(SeaTunnelRow element) throws IOException { // 1. 确定分片键值 Object shardKey null; if (StringUtils.isNotEmpty(this.option.getShardMetadata().getShardKey())) { int i this.option.getSeaTunnelRowType().indexOf(this.option.getShardMetadata().getShardKey()); shardKey element.getField(i); } // 2. 路由到目标分片 ClickhouseBatchStatement statement statementMap.get(shardRouter.getShard(shardKey)); JdbcBatchStatementExecutor clickHouseStatement statement.getJdbcBatchStatementExecutor(); IntHolder sizeHolder statement.getIntHolder(); // 3. 添加到批次 addIntoBatch(element, clickHouseStatement); sizeHolder.setValue(sizeHolder.getValue() 1); // 4. 批量刷新 if (sizeHolder.getValue() option.getBulkSize()) { flush(clickHouseStatement); sizeHolder.setValue(0); } }5.2 分片路由算法scss体验AI代码助手代码解读复制代码ShardRouter.getShard() → HASH_INSTANCE.hash() → TreeMap.lowerEntry()关键函数:ShardRouter.getShard()scss体验AI代码助手代码解读复制代码public Shard getShard(Object shardValue) { if (!splitMode) { return shards.firstEntry().getValue(); } if (StringUtils.isEmpty(shardKey) || shardValue null) { // 没有分片键时随机选择 return shards.lowerEntry(threadLocalRandom.nextInt(shardWeightCount) 1).getValue(); } // 使用 XXHash64 进行一致性哈希 int offset (int) ((HASH_INSTANCE.hash( ByteBuffer.wrap(shardValue.toString().getBytes(StandardCharsets.UTF_8)), 0) Long.MAX_VALUE) % shardWeightCount); return shards.lowerEntry(offset 1).getValue(); }5.3 批量执行器处理scss体验AI代码助手代码解读复制代码JdbcBatchStatementExecutorBuilder.build() → SimpleBatchStatementExecutor → executeBatch()关键函数:SimpleBatchStatementExecutor.executeBatch()csharp体验AI代码助手代码解读复制代码Override public void executeBatch() throws SQLException { if (batchCount 0) { try { // 执行批量插入 statement.executeBatch(); connection.commit(); batchCount 0; } catch (SQLException e) { connection.rollback(); throw new ClickhouseConnectorException( CommonErrorCodeDeprecated.FLUSH_DATA_FAILED, #34;Clickhouse execute batch statement error#34;, e ); } } }5.4 数据类型注入scss体验AI代码助手代码解读复制代码JdbcRowConverter.toExternal() → fieldInjectFunctionMap.get().injectFields()关键函数:JdbcRowConverter.toExternal()ini体验AI代码助手代码解读复制代码public PreparedStatement toExternal(SeaTunnelRow row, PreparedStatement statement) throws SQLException { for (int i 0; i projectionFields.length; i) { String fieldName projectionFields[i]; Object fieldValue fieldGetterMap.get(fieldName).apply(row); if (fieldValue null) { statement.setObject(i 1, null); continue; } // 使用对应的注入函数处理字段值 fieldInjectFunctionMap .getOrDefault(fieldName, DEFAULT_INJECT_FUNCTION) .injectFields(statement, i 1, fieldValue); } return statement; }第六阶段增量阶段切换6.1 快照完成检测scss体验AI代码助手代码解读复制代码IncrementalSourceReader.onSplitFinished() → reportFinishedSnapshotSplitsIfNeed()关键函数:IncrementalSourceReader.onSplitFinished()scss体验AI代码助手代码解读复制代码Override protected void onSplitFinished(Map finishedSplitIds) { for (SourceSplitStateBase splitState : finishedSplitIds.values()) { SourceSplitBase sourceSplit splitState.toSourceSplit(); checkState( sourceSplit.isSnapshotSplit() sourceSplit.asSnapshotSplit().isSnapshotReadFinished(), String.format(#34;Only snapshot split could finish, but the actual split is incremental split %s#34;, sourceSplit) ); finishedUnackedSplits.put(sourceSplit.splitId(), sourceSplit.asSnapshotSplit()); } reportFinishedSnapshotSplitsIfNeed(); context.sendSplitRequest(); // 请求下一个分片 }6.2 增量分片分配scss体验AI代码助手代码解读复制代码HybridSplitAssigner.getNext() → incrementalSplitAssigner.getNext()关键函数:IncrementalSplitAssigner.getNext()scss体验AI代码助手代码解读复制代码Override public Optional getNext() { if (remainingTables.isEmpty()) { return Optional.empty(); } // 为每个表创建一个增量分片 TableId tableId remainingTables.iterator().next(); remainingTables.remove(tableId); // 创建增量分片从快照的高水位标记开始 IncrementalSplit split new IncrementalSplit( tableId.toString(), Collections.singletonList(tableId), completedSnapshotSplitWatermarks.get(tableId).getHighWatermark(), stopOffset, Collections.emptyList() ); return Optional.of(split); }6.3 Binlog 事件处理scss体验AI代码助手代码解读复制代码MySqlBinlogFetchTask.execute() → MySqlStreamingChangeEventSource.execute() → handleEvent()关键函数:MySqlBinlogFetchTask.MySqlBinlogSplitReadTask.handleEvent()scss体验AI代码助手代码解读复制代码Override protected void handleEvent(MySqlPartition partition, MySqlOffsetContext offsetContext, Event event) { super.handleEvent(partition, offsetContext, event); // 检查是否需要停止有界读取 if (isBoundedRead()) { final BinlogOffset currentBinlogOffset getBinlogPosition(offsetContext.getOffset()); // 达到高水位标记结束 binlog 读取 if (currentBinlogOffset.isAtOrAfter(binlogSplit.getStopOffset())) { // 发送 binlog 结束事件 dispatcher.dispatchWatermarkEvent( partition.getSourcePartition(), binlogSplit, currentBinlogOffset, WatermarkKind.END ); // 通知读取器任务完成 ((MySqlSnapshotFetchTask.SnapshotBinlogSplitChangeEventSourceContext) context).finished(); } } }关键数据转换过程1. MySQL 数据类型到 SeaTunnel 类型的转换转换器:MySqlTypeUtils.convertFromColumn()csharp体验AI代码助手代码解读复制代码public static SeaTunnelDataType convertFromColumn(Column column, RelationalDatabaseConnectorConfig config) { String mysqlType column.typeName().toUpperCase(); switch (mysqlType) { case #34;INT#34;: case #34;INTEGER#34;: return BasicType.INT_TYPE; case #34;BIGINT#34;: return BasicType.LONG_TYPE; case #34;VARCHAR#34;: case #34;TEXT#34;: return BasicType.STRING_TYPE; case #34;DATETIME#34;: return LocalTimeType.LOCAL_DATE_TIME_TYPE; case #34;TIMESTAMP#34;: return LocalTimeType.LOCAL_DATE_TIME_TYPE; case #34;DECIMAL#34;: case #34;NUMERIC#34;: return new DecimalType(column.length(), column.scale()); default: return BasicType.STRING_TYPE; } }2. SeaTunnel 类型到 ClickHouse 类型的转换转换器:JdbcRowConverter.createFieldInjectFunctionMap()scss体验AI代码助手代码解读复制代码private Map createFieldInjectFunctionMap( String[] fields, Map clickhouseTableSchema) { Map fieldInjectFunctionMap new HashMap(); for (String field : fields) { String fieldType clickhouseTableSchema.get(field); ClickhouseFieldInjectFunction injectFunction Arrays.asList( new ArrayInjectFunction(), new MapInjectFunction(), new BigDecimalInjectFunction(), new DateTimeInjectFunction(), new LongInjectFunction(), new DoubleInjectFunction(), new IntInjectFunction(), new StringInjectFunction() ).stream() .filter(f - f.isCurrentFieldType(unwrapCommonPrefix(fieldType))) .findFirst() .orElse(new StringInjectFunction()); fieldInjectFunctionMap.put(field, injectFunction); } return fieldInjectFunctionMap; }3. 数据行转换流程体验AI代码助手代码解读复制代码MySQL ResultSet → Debezium SourceRecord → SeaTunnelRow → ClickHouse PreparedStatement完整转换链:1.MySQL ResultSet: 从数据库查询结果2.Debezium SourceRecord: Debezium 格式的事件记录3.SeaTunnelRow: SeaTunnel 统一的数据格式4.ClickHouse PreparedStatement: 最终写入 ClickHouse 的语句精确一次处理实现1. 水位标记机制低水位标记: 快照开始时的 binlog 位置高水位标记: 快照结束时的 binlog 位置ini体验AI代码助手代码解读复制代码// 低水位标记 final BinlogOffset lowWatermark currentBinlogOffset(jdbcConnection); dispatcher.dispatchWatermarkEvent(partition, snapshotSplit, lowWatermark, WatermarkKind.LOW); // 数据快照 createDataEvents(ctx, snapshotSplit.getTableId()); // 高水位标记 final BinlogOffset highWatermark currentBinlogOffset(jdbcConnection); dispatcher.dispatchWatermarkEvent(partition, snapshotSplit, highWatermark, WatermarkKind.HIGH);2. 增量补偿机制当lowWatermark ! highWatermark时需要进行增量补偿scss体验AI代码助手代码解读复制代码// 创建补偿增量分片 IncrementalSplit backfillSplit new IncrementalSplit( splitId, Collections.singletonList(tableId), lowWatermark, // 从低水位开始 highWatermark, // 到高水位结束 new ArrayList() ); // 执行补偿读取 MySqlBinlogSplitReadTask backfillTask new MySqlBinlogSplitReadTask(...); backfillTask.execute(context, partition, offsetContext);3. 偏移量原子性java体验AI代码助手代码解读复制代码Override public void notifyCheckpointComplete(long checkpointId) throws Exception { // 在检查点完成时提交变更日志偏移量 dataSourceDialect.commitChangeLogOffset(snapshotChangeLogOffset); }性能优化点1. 并行度控制•快照阶段: 多分片并行读取提高全量数据读取效率•增量阶段: 单线程顺序处理保证事件顺序性2. 批量处理•批次大小: 可配置的bulk_size参数默认 20000•批量提交: 减少网络往返和事务开销3. 内存管理•流式处理: 避免大量数据驻留内存•连接池: 复用数据库连接减少连接创建开销错误处理与容错1. 连接异常处理csharp体验AI代码助手代码解读复制代码try { statement.executeBatch(); connection.commit(); } catch (SQLException e) { connection.rollback(); throw new ClickhouseConnectorException( CommonErrorCodeDeprecated.FLUSH_DATA_FAILED, #34;Clickhouse execute batch statement error#34;, e ); }2. 数据类型不匹配处理scss体验AI代码助手代码解读复制代码// 使用默认转换器处理未知类型 fieldInjectFunctionMap .getOrDefault(fieldName, DEFAULT_INJECT_FUNCTION) .injectFields(statement, i 1, fieldValue);3. 重试机制•连接重试: 支持配置connect.max-retries参数•自动重连: 数据库连接断开后自动重连监控与指标1. CDC 指标less体验AI代码助手代码解读复制代码// 记录获取延迟 recordFetchDelay.set(fetchDelay 0 ? fetchDelay : 0); // 记录发送延迟 recordEmitDelay.set(emitDelay 0 ? emitDelay : 0); // 延迟事件监控 if (delayedEventLimiter.acquire(messageTimestamp)) { eventListener.onEvent(new MessageDelayedEvent(emitDelay, element.toString())); }2. ClickHouse 指标•批次大小: 当前批次中的记录数•写入延迟: 数据写入 ClickHouse 的耗时•错误率: 写入失败的比例总结整个 MySQL CDC 到 ClickHouse 的数据流转过程是一个精心设计、高度优化的流水线主要特点包括1.全链路数据一致性• 通过水位标记和补偿机制确保数据不丢失• 支持精确一次处理语义• 自动处理快照期间的变更数据2.高性能并行处理• 快照阶段支持多分片并行读取• 批量写入 ClickHouse 减少网络开销• 智能分片路由实现负载均衡3.灵活的数据转换• 支持复杂的数据类型映射• 自动处理 DDL 结构变更• 提供丰富的元数据信息4.企业级可靠性• 完善的错误处理和重试机制• 支持故障恢复和断点续传• 提供详细的监控指标5.优化策略• 流式处理避免内存溢出• 连接池复用减少资源消耗• 智能批次管理平衡性能和延迟这个架构设计使得 SeaTunnel 能够高效、可靠地将 MySQL 的变更数据实时同步到 ClickHouse为实时分析提供强有力的数据支撑。.preview-wrapper pre::before { position: absolute; top: 0; right: 0; color: #ccc; text-align: center; font-size: 0.8em; padding: 5px 10px 0; line-height: 15px; height: 15px; font-weight: 600; } .hljs.code__pre .mac-sign { display: flex; } .code__pre { padding: 0 !important; } .hljs.code__pre code { display: -webkit-box; padding: 0.5em 1em 1em; overflow-x: auto; text-indent: 0; } h2 strong { color: inherit !important; }