采用mybatis配置 参考:https://blog.csdn.net/weixin_43291055/article/details/100152026 只需要在sink的时候自定义sink即可,核心代码如下: public class SinkTest extends RichSinkFunction { private UserMapper userMapper; @Override public void open(Configuration parameters) { try { userMapper = UserDao.getInstance(); } catch (Exception e) { System.out.println(e); } } public void invoke(TestModel value, Context context) { try { Info info = new Info(); info.setId(value.getId()); info.setName(value.getName()); userMapper.insert(info); } catch (Exception e) { System.out.println(e); } } } 注意:UserDao.getInstance() 单例 public class UserDao { private volatile static UserMapper userMapper; private UserDao() { } public static UserMapper getInstance() { if (userMapper == null) { synchronized (UserDao.class) { if (userMapper == null) { ApplicationContext ctx = new ClassPathXmlApplicationContext("classpath:applicationContext.xml"); userMapper = ctx.getBean(UserMapper.class); } } } return userMapper; } }