前沿:因最近再研究大数据方面的东东,想使用flume将mysql中的数据增量导入到hdfs中。然后sparkRDD或者SparkSQL处理各项指标,最后再写入hdfs的指定目录。

flume版本:/app/apache-flume-1.9.0-bin

1.首先引入maven依赖

 <dependency>
   <groupId>mysql</groupId>
   <artifactId>mysql-connector-java</artifactId>
 </dependency>
 <dependency>
   <groupId>org.apache.flume</groupId>
   <artifactId>flume-ng-core</artifactId>
   <version>1.8.0</version>
 </dependency>
 <dependency>
   <groupId>org.apache.flume</groupId>
   <artifactId>flume-ng-configuration</artifactId>
   <version>1.8.0</version>
 </dependency> 

2.自定义source代码如下

MySQLSourceHelper.java如下:

package flume;
import org.apache.flume.Context;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.*;
import java.sql.*;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;

public class MySQLSourceHelper {

    private static final Logger LOG = LoggerFactory.getLogger(MySQLSourceHelper.class);

    // 开始 id
    private static Long startNum = 0l;
    private static String startField = "";
    
    private static String startStatusFile ;
    private static final Long DEFAULT_START_VALUE = 0l;

    private static String dbUrl, dbUser, dbPassword, dbDriver;
    private static Connection conn = null;
    private static PreparedStatement ps = null;
    
    private static String select = "";

    // 获取 JDBC 连接
    private static Connection getConnection() {
        try {
            Class.forName(dbDriver);
            return DriverManager.getConnection(dbUrl, dbUser, dbPassword);
        } catch (Exception e) {
            e.printStackTrace();
			LOG.error("数据库连接错误",e);
        }
        return null;
    }

    // 构造方法
    MySQLSourceHelper(Context context) {
        // 有默认值参数:获取 flume 任务配置文件中的参数,读不到的采用默认值
        this.startNum = context.getLong("sql.startNum", DEFAULT_START_VALUE);
        this.startField = context.getString("sql.startField");
        this.startStatusFile = context.getString("sql.startStatusFile");
        this.select = context.getString("sql.select");

        dbUrl = context.getString("db.url");
        dbUser = context.getString("db.user");
        dbPassword = context.getString("db.password");
        dbDriver = context.getString("db.driver");
        conn = getConnection();
    }

    public static void main(String[] args){
        dbUrl = "jdbc:mysql://hechaojie.com/health?characterEncoding=UTF-8&amp;zeroDateTimeBehavior=convertToNull";
        dbUser = "root";
        dbPassword = "hechaojie_com@2019";
        dbDriver = "com.mysql.jdbc.Driver";
        conn = getConnection();
    
        select = "select * from hea_weight t where t.create_at > $create_at order by t.create_at asc limit 10";
        startField = "create_at";
        
        System.out.println(executeQuery());
    }
    
    
    // 构建 sql 语句,以 id 作为 offset
    private static String buildQuery() {
        StringBuilder execSql = new StringBuilder(select.replace("$"+startField,String.valueOf(getStartStatus(startNum))));
        return execSql.toString();
    }

    // 执行查询
    static Map<String,Object> executeQuery() {
        try {
            // 每次执行查询时都要重新生成 sql,因为 id 不同
            String customQuery = buildQuery();
            // 存放结果的集合
            List<List<Object>> results = new ArrayList<>();

            ps = conn.prepareStatement(customQuery);
            ResultSet result = ps.executeQuery(customQuery);
            System.out.println("ps "+ps.getMetaData().getColumnLabel(1));
    
            int startColumn = -1;
            int columnCount = ps.getMetaData().getColumnCount();
            for(int i=1;i<columnCount;i++){
                if(ps.getMetaData().getColumnName(i).equals(startField)){
                    startColumn = i;
                    break;
                }
            }
            
            System.out.println("startColumn "+startColumn);
            
            long maxNum = -1;
            
            while (result.next()) {
                // 存放一条数据的集合(多个列)
                List<Object> row = new ArrayList<>();
                // 将返回结果放入集合
                for (int i = 1; i <= result.getMetaData().getColumnCount(); i++) {
                    if(startColumn == i){
                        if(maxNum < result.getLong(i)){
                            maxNum =result.getLong(i);
                        }
                    }
                    row.add(result.getObject(i));
                }
                results.add(row);
            }
            LOG.info("maxNum:" + maxNum);
            LOG.info("execSql:" + customQuery + "\tresultSize:" + results.size());
            
            Map<String,Object> resultMap = new HashMap<String,Object>();
            resultMap.put("list",results);
            resultMap.put("maxNum",maxNum);
            return resultMap;
        } catch (SQLException e) {
            LOG.error(e.toString());
            // 重新连接
//            conn = getConnection();
        }
        return null;
    }

    // 将结果集转化为字符串,每一条数据是一个 list 集合,将每一个小的 list 集合转化为字符串
    List<String> getAllRows(List<List<Object>> queryResult) {
        List<String> allRows = new ArrayList<>();
        StringBuilder row = new StringBuilder();
        for (List<Object> rawRow : queryResult) {
            for (Object aRawRow : rawRow) {
                if (aRawRow == null) {
                    row.append(",");
                } else {
                    row.append(aRawRow.toString()).append(",");
                }
            }
            allRows.add(row.toString());
            row = new StringBuilder();
        }
        return allRows;
    }
    
    public void updateOffsetStatus(Long startNum) {
        String str = String.valueOf(startNum);
        byte[] b = str.getBytes();
        OutputStream out = null;
        try {
            File file = new File(startStatusFile);
            if(!file.exists()){
                file.createNewFile();
            }
            out = new FileOutputStream(file);
        }catch(Exception e){
            e.printStackTrace();
        }
        try {
            out.write(b);
            out.close();
        } catch (IOException e) {
            e.printStackTrace();
        }
    }
    
    public static String readFileContent(String fileName) {
        File file = new File(fileName);
        BufferedReader reader = null;
        StringBuffer sbf = new StringBuffer();
        try {
            reader = new BufferedReader(new FileReader(file));
            String tempStr;
            while ((tempStr = reader.readLine()) != null) {
                sbf.append(tempStr);
            }
            reader.close();
            return sbf.toString();
        } catch (IOException e) {
            e.printStackTrace();
        } finally {
            if (reader != null) {
                try {
                    reader.close();
                } catch (IOException e1) {
                    e1.printStackTrace();
                }
            }
        }
        return sbf.toString();
    }
    
    /**
     * 查询开始游标
     * @param startNum
     * @return
     */
    private static Long getStartStatus(Long startNum) {
        File file = new File(startStatusFile);
        if(file.exists()){
            return Long.parseLong(readFileContent(startStatusFile).trim());
        }
        return startNum;
    }

    // 关闭相关资源
    void close() {
        try {
            ps.close();
            conn.close();
        } catch (SQLException e) {
            e.printStackTrace();
        }
    }

}

MySQLSource.java代码如下:

package flume;

import org.apache.flume.Context;
import org.apache.flume.Event;
import org.apache.flume.EventDeliveryException;
import org.apache.flume.PollableSource;
import org.apache.flume.conf.Configurable;
import org.apache.flume.event.SimpleEvent;
import org.apache.flume.source.AbstractSource;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;

public class MySQLSource extends AbstractSource implements Configurable, PollableSource {

    // 打印日志
    private static final Logger LOG = LoggerFactory.getLogger(MySQLSource.class);

    // sqlHelper
    private MySQLSourceHelper sqlSourceHelper;

    // 两次查询的时间间隔
    private int queryDelay;
    private static final int DEFAULT_QUERY_DELAY = 10000;

    @Override
    public long getBackOffSleepIncrement() {
        return 0;
    }

    @Override
    public long getMaxBackOffSleepInterval() {
        return 0;
    }

    @Override
    public void configure(Context context) {
        // 初始化
        sqlSourceHelper = new MySQLSourceHelper(context);
        queryDelay = context.getInteger("query.delay", DEFAULT_QUERY_DELAY);
    }

    @Override
    public Status process() throws EventDeliveryException {
        try {
            // 存放 event 的集合
            List<Event> events = new ArrayList<>();

            // 查询数据表
            Map<String,Object> resultMap = sqlSourceHelper.executeQuery();
            List<List<Object>> result = (List<List<Object>>)resultMap.get("list");
            Long maxNum = Long.parseLong(String.valueOf(resultMap.get("maxNum")));
			// 存放 event 头集合
			HashMap<String, String> header = new HashMap<>();
            // 如果有返回数据,则将数据封装为 event
            if (!result.isEmpty()) {
                List<String> allRows = sqlSourceHelper.getAllRows(result);
                Event event = null;
                for (String row : allRows) {
                    event = new SimpleEvent();
                    event.setHeaders(header);
                    event.setBody(row.getBytes());
                    events.add(event);
                }
                // 将 event 写入 channel
                getChannelProcessor().processEventBatch(events);
                // 更新offset 信息
                sqlSourceHelper.updateOffsetStatus(maxNum);
            }
            // 等待时长
            Thread.sleep(queryDelay);
            return Status.READY;
        } catch (InterruptedException e) {
            LOG.error("Error procesing row", e);
            return Status.BACKOFF;
        }
    }

    @Override
    public synchronized void stop() {
        LOG.info("Stopping sql source {} ...", getName());
        try {
            sqlSourceHelper.close();
        } finally {
            super.stop();
        }
    }
}

3.复制mysql连接池到lib目录下

cp mysql-connector-java-5.1.31.jar lib/

将自定义source打成jar包放到lib目录下

4.flume配置文件

conf/usr_user.conf

 # Name the components on this agent
 a1.sources = r1
 a1.sinks = k1
 a1.channels = c1
 # Describe/configure the source
 a1.sources.r1.type = flume.MySQLSource
 a1.sources.r1.db.driver = com.mysql.jdbc.Driver
 a1.sources.r1.db.url = jdbc:mysql://localhost/account?characterEncoding=UTF-8&zeroDateTimeBehavior=convertToNull
 a1.sources.r1.db.user = account
 a1.sources.r1.db.password = password
 #同步时间戳的字段名
 a1.sources.r1.sql.startField=create_at
 #从哪个时间开始同步
 a1.sources.r1.ql.startNum=-1
 #偏移量保存的文件
 a1.sources.r1.sql.startStatusFile=offest_usr_user.txt
 #同步的sql语句
 a1.sources.r1.sql.select=select id,phone,create_at,sex,`status`,birthdate,nick_name from usr_user t where t.create_at > $create_at order by t.create_at asc limit 10
 # Describe/configure the source
 #a1.sources.r1.type = netcat
 #a1.sources.r1.bind = localhost
 #a1.sources.r1.port = 44444
 # Describe the sink
 #a1.sinks.k1.type = logger
  # defind hdfs sink
 a1.sinks.k1.type = hdfs 
  # specify the channel the sink should use  
 a1.sinks.k1.channel = c1
 a1.sinks.k1.hdfs.useLocalTimeStamp = true
  # set store hdfs path 设置存储hdfs路径
 a1.sinks.k1.hdfs.path = hdfs://host:19000/data/test/usr_user/%Y%m%d  
  # set file size to trigger roll
  # HDFS上的文件达到128M时生成一个文件
 a1.sinks.k1.hdfs.rollSize = 134217728  
  # 不按照条数生成文件
 a1.sinks.k1.hdfs.rollCount = 0  
  # HDFS上的文件达到3600秒生成一个文件
 a1.sinks.k1.hdfs.rollInterval = 3600  
 a1.sinks.k1.hdfs.threadsPoolSize = 30
  # log文件前缀
 a1.sinks.k1.hdfs.filePrefix = user
 a1.sinks.k1.hdfs.fileType=DataStream    
 a1.sinks.k1.hdfs.writeFormat=Text    
 # Describe the channel
 a1.channels.c1.type = memory
 a1.channels.c1.capacity = 1000
 a1.channels.c1.transactionCapacity = 100
 # Bind the source and sink to the channel
 a1.sources.r1.channels = c1
 a1.sinks.k1.channel = c1 

5.flume后台启动命令

nohup bin/flume-ng agent --conf conf/ --name a1 --conf-file conf/usr_user.conf -Dflume.root.logger=INFO,console >> usr_user.log &

问题总结:如果flume挂的话,channel中的数据因为是在内存中的,因此会丢失。后面再讨论如何解决channel数据丢失问题。

发表评论

电子邮件地址不会被公开。 必填项已用*标注