多线程读取并解析csv

张开发
2026/4/17 8:36:38 15 分钟阅读

分享文章

多线程读取并解析csv
import java.util.*; /** * 1. 定义提取策略封装“如何从原始行提取数据”的逻辑 */ class ColumnExtractionStrategy { private final int[] indices; public ColumnExtractionStrategy(int... indices) { this.indices indices; } /** * 执行提取 */ public ListString extract(String[] rawColumns) { ListString result new ArrayList(); for (int index : indices) { if (index rawColumns.length) { result.add(rawColumns[index].trim()); // 自动去空格 } else { result.add(); // 防止数组越界给空值 } } return result; } } /** * 2. 策略注册表静态管理所有文件的提取规则 */ public class StrategyConfig { // 注册表Key是文件名特征Value是提取策略 private static final MapString, ColumnExtractionStrategy STRATEGY_MAP new HashMap(); static { // 类3 CSV提取第 3, 4, 5, 6 列 (对应数组下标) STRATEGY_MAP.put(type3, new ColumnExtractionStrategy(3, 4, 5, 6)); // 类4 CSV提取第 2, 4 列 STRATEGY_MAP.put(type4, new ColumnExtractionStrategy(2, 4)); // 类5 CSV提取第 0, 1 列 (假设是经纬度) STRATEGY_MAP.put(type5, new ColumnExtractionStrategy(0, 1)); } /** * 根据文件名获取策略 */ public static ColumnExtractionStrategy getStrategy(String fileName) { for (Map.EntryString, ColumnExtractionStrategy entry : STRATEGY_MAP.entrySet()) { if (fileName.contains(entry.getKey())) { System.out.println(匹配到策略: entry.getKey()); return entry.getValue(); } } return null; } }import java.util.Arrays; /** * 3. 通用行数据对象 */ class RawRow { private long rowIndex; private String[] columns; public RawRow(long rowIndex, String[] columns) { this.rowIndex rowIndex; this.columns columns; } public long getRowIndex() { return rowIndex; } public String[] getColumns() { return columns; } }import java.io.BufferedReader; import java.io.FileReader; import java.io.IOException; import java.util.ArrayList; import java.util.List; import java.util.concurrent.BlockingQueue; /** * 4. 生产者负责读取 CSV 并利用策略过滤列 */ public class CsvProducer { private final BlockingQueueListRawRow queue; private static final int BATCH_SIZE 1000; public CsvProducer(BlockingQueueListRawRow queue) { this.queue queue; } /** * 读取文件 * param filePath 文件路径 * param strategy 提取策略如果为 null则不提取直接透传所有列 */ public void produce(String filePath, ColumnExtractionStrategy strategy) throws IOException { try (BufferedReader br new BufferedReader(new FileReader(filePath))) { // 跳过表头 String header br.readLine(); if (header null) return; String line; long rowIndex 0; ListRawRow buffer new ArrayList(); while ((line br.readLine()) ! null) { rowIndex; String[] rawCols line.split(,); // 简单分割 // 核心利用策略过滤数据 String[] finalCols; if (strategy ! null) { // 如果找到了策略只提取需要的列 ListString extracted strategy.extract(rawCols); finalCols extracted.toArray(new String[0]); } else { // 如果没有策略普通文件使用原始列 finalCols rawCols; } buffer.add(new RawRow(rowIndex, finalCols)); // 批量入队 if (buffer.size() BATCH_SIZE) { queue.put(new ArrayList(buffer)); buffer.clear(); } } // 剩余数据入队 if (!buffer.isEmpty()) { queue.put(new ArrayList(buffer)); } } catch (InterruptedException e) { e.printStackTrace(); } } }import java.util.*; import java.util.concurrent.BlockingQueue; /** * 5. 消费者从队列取数据组装成 Map (模拟 MyBatis 入库) */ public class MapBasedConsumer implements Runnable { private final BlockingQueueListRawRow queue; private final ListRawRow POISON_PILL new ArrayList(); private final int threadId; // 假设这是数据库字段名实际项目中可以从配置里读或者根据下标硬编码 // 注意这里的顺序必须和 StrategyConfig 里的列顺序对应 // 比如 type3 是 [3,4,5,6]那这里就是 [fieldA, fieldB, fieldC, fieldD] private final ListString targetFields; public MapBasedConsumer(BlockingQueueListRawRow queue, int threadId, ListString targetFields) { this.queue queue; this.threadId threadId; this.targetFields targetFields; } Override public void run() { try { while (true) { ListRawRow batch queue.take(); if (batch POISON_PILL) break; for (RawRow row : batch) { // 组装 Map MapString, Object dataMap new HashMap(); String[] cols row.getColumns(); for (int i 0; i cols.length; i) { // 防止配置不对导致数组越界 if (i targetFields.size()) { dataMap.put(targetFields.get(i), cols[i]); } } // 模拟 MyBatis 插入 // System.out.println(线程 threadId 准备插入: dataMap); // genericMapper.insert(dataMap); } System.out.println(线程 threadId 处理了一批: batch.size()); } } catch (Exception e) { e.printStackTrace(); } } }import java.util.Arrays; import java.util.concurrent.ArrayBlockingQueue; import java.util.concurrent.BlockingQueue; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; /** * 6. 主程序入口 */ public class Main { // 共享队列 private static final BlockingQueueListRawRow queue new ArrayBlockingQueue(10); private static final ListRawRow POISON_PILL new ArrayList(); public static void main(String[] args) { String filePath data_type3.csv; // 测试文件名 // 1. 根据文件名获取策略 ColumnExtractionStrategy strategy StrategyConfig.getStrategy(filePath); // 2. 启动消费者 // 注意这里需要告诉消费者目标字段名以便组装 Map // 实际项目中这个字段列表也可以放在 StrategyConfig 里或者通过注解获取 // 这里为了演示假设 type3 对应的数据库字段是 name, age, city, phone java.util.ListString dbFields Arrays.asList(name, age, city, phone); ExecutorService executor Executors.newFixedThreadPool(4); for (int i 0; i 4; i) { executor.submit(new MapBasedConsumer(queue, i, dbFields)); } // 3. 启动生产者 CsvProducer producer new CsvProducer(queue); try { producer.produce(filePath, strategy); } catch (Exception e) { e.printStackTrace(); } // 4. 发送结束信号 for (int i 0; i 4; i) { try { queue.put(POISON_PILL); } catch (InterruptedException e) {} } executor.shutdown(); System.out.println(程序执行完毕); } }

更多文章