@李玉珏 你好,想跟你请教个问题:内存模式都是REPLICATED。我在个人电脑上单台测试挺快。部署到2台服务器,以及自己个人电脑,3台一起跑结果慢的要死。看起来像都远程访问了同一个缓存似的。关闭一台,另外一台又快点。
内存服务器完全按照你的手册里堆内配置,配置的8G. 我个人电脑配置2G,其它的也是堆内配置。
内存加载是在idea开发工具里启动加载的,然后就启动分布式计算。
按照个人想法是启动多台服务器后,再启动idea的节点加载内存,然后内存会自动复制到所有其它服务器,然后再启动各自计算。
把配置文件拿出来看看?
######
<?xml version="1.0" encoding="UTF8"?> <beans xmlns="http://www.springframework.org/schema/beans" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:util="http://www.springframework.org/schema/util" xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd http://www.springframework.org/schema/util http://www.springframework.org/schema/util/spring-util.xsd"> <bean id="ignite.cfg" class="org.apache.ignite.configuration.IgniteConfiguration"> <!-- Set to true to enable distributed class loading for examples, default is false. --> <property name="peerClassLoadingEnabled" value="true"/> <property name="discoverySpi"> <bean class="org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi"> <property name="ipFinder"> <!-- Ignite provides several options for automatic discovery that can be used instead os static IP based discovery. For information on all options refer to our documentation: http://apacheignite.readme.io/docs/cluster-config --> <!-- Uncomment static IP finder to enable static-based discovery of initial nodes. --> <!--<bean class="org.apache.ignite.spi.discovery.tcp.ipfinder.vm.TcpDiscoveryVmIpFinder">--> <bean class="org.apache.ignite.spi.discovery.tcp.ipfinder.multicast.TcpDiscoveryMulticastIpFinder"> <property name="addresses"> <list> <!-- In distributed environment, replace with actual host IP address. --> <value>127.0.0.1:48500..48509</value> </list> </property> </bean> </property> </bean> </property> <!-- Explicitly configure TCP communication SPI changing local port number for the nodes from the first cluster. --> <property name="communicationSpi"> <bean class="org.apache.ignite.spi.communication.tcp.TcpCommunicationSpi"> <property name="localPort" value="48100"/> </bean> </property> <!-- Enable task execution events for examples. --> <property name="includeEventTypes"> <list> <!--Task execution events--> <util:constant static-field="org.apache.ignite.events.EventType.EVT_TASK_STARTED"/> <util:constant static-field="org.apache.ignite.events.EventType.EVT_TASK_FINISHED"/> <util:constant static-field="org.apache.ignite.events.EventType.EVT_TASK_FAILED"/> <util:constant static-field="org.apache.ignite.events.EventType.EVT_TASK_TIMEDOUT"/> <util:constant static-field="org.apache.ignite.events.EventType.EVT_TASK_SESSION_ATTR_SET"/> <util:constant static-field="org.apache.ignite.events.EventType.EVT_TASK_REDUCED"/> <!--Cache events--> <util:constant static-field="org.apache.ignite.events.EventType.EVT_CACHE_OBJECT_PUT"/> <util:constant static-field="org.apache.ignite.events.EventType.EVT_CACHE_OBJECT_READ"/> <util:constant static-field="org.apache.ignite.events.EventType.EVT_CACHE_OBJECT_REMOVED"/> </list> </property> </bean> <bean id="igniteCacheFactory" class="com.ignite.IgniteCacheFactory"> <constructor-arg index="0" ref="typesMap" /> <constructor-arg index="1" ref="queryEntitiesMap" /> <constructor-arg index="2" ref="ignite.cfg" /> <constructor-arg index="3" value="REPLICATED" /> </bean> <bean id="typesMap" class="com.ignite.TypesMap"> <property name="list"> <list> <bean class="org.apache.ignite.cache.store.jdbc.JdbcType"> <property name="databaseSchema" value="mishu"/> <property name="databaseTable" value="cert_aqrz_copy"/> <property name="keyType" value="com.toubiaomishu.mojo.CertAqrzCopyKey"/> <property name="valueType" value="com.toubiaomishu.mojo.CertAqrzCopy"/> <property name="keyFields"> <list> <bean class="org.apache.ignite.cache.store.jdbc.JdbcTypeField"> <property name="databaseFieldType"> <util:constant static-field="java.sql.Types.VARCHAR"/> </property> <property name="databaseFieldName" value="srcUUid"/> <property name="javaFieldType" value="java.lang.String"/> <property name="javaFieldName" value="srcuuid"/> </bean> </list> </property> <property name="valueFields"> <list> <bean class="org.apache.ignite.cache.store.jdbc.JdbcTypeField"> <property name="databaseFieldType"> <util:constant static-field="java.sql.Types.VARCHAR"/> </property> <property name="databaseFieldName" value="srcUUid"/> <property name="javaFieldType" value="java.lang.String"/> <property name="javaFieldName" value="srcuuid"/> </bean> <bean class="org.apache.ignite.cache.store.jdbc.JdbcTypeField"> <property name="databaseFieldType"> <util:constant static-field="java.sql.Types.VARCHAR"/> </property> <property name="databaseFieldName" value="secureLevel"/> <property name="javaFieldType" value="java.lang.String"/> <property name="javaFieldName" value="securelevel"/> </bean> <bean class="org.apache.ignite.cache.store.jdbc.JdbcTypeField"> <property name="databaseFieldType"> <util:constant static-field="java.sql.Types.VARCHAR"/> </property> <property name="databaseFieldName" value="secureRank"/> <property name="javaFieldType" value="java.lang.String"/> <property name="javaFieldName" value="securerank"/> </bean> <bean class="org.apache.ignite.cache.store.jdbc.JdbcTypeField"> <property name="databaseFieldType"> <util:constant static-field="java.sql.Types.VARCHAR"/> </property> <property name="databaseFieldName" value="yxqTime"/> <property name="javaFieldType" value="java.lang.String"/> <property name="javaFieldName" value="yxqtime"/> </bean> <bean class="org.apache.ignite.cache.store.jdbc.JdbcTypeField"> <property name="databaseFieldType"> <util:constant static-field="java.sql.Types.VARCHAR"/> </property> <property name="databaseFieldName" value="zhUUid"/> <property name="javaFieldType" value="java.lang.String"/> <property name="javaFieldName" value="zhuuid"/> </bean> </list> </property> </bean> </list> </property> </bean> <bean id="queryEntitiesMap" class="com.ignite.QueryEntitiesMap"> <property name="list"> <list> <bean class="org.apache.ignite.cache.QueryEntity"> <property name="keyType" value="com.toubiaomishu.mojo.CertAqrzCopyKey"/> <property name="valueType" value="com.toubiaomishu.mojo.CertAqrzCopy"/> <property name="fields"> <util:map map-class="java.util.LinkedHashMap"> <entry key="srcuuid" value="java.lang.String"/> <entry key="securelevel" value="java.lang.String"/> <entry key="securerank" value="java.lang.String"/> <entry key="yxqtime" value="java.lang.String"/> <entry key="zhuuid" value="java.lang.String"/> </util:map> </property> <property name="indexes"> <list> <bean class="org.apache.ignite.cache.QueryIndex"> <property name="name" value="PRIMARY"/> <property name="indexType"> <util:constant static-field="org.apache.ignite.cache.QueryIndexType.SORTED"/> </property> <property name="fields"> <map> <entry key="srcuuid" value="true"/> </map> </property> </bean> <bean class="org.apache.ignite.cache.QueryIndex"> <property name="name" value="srcUUid"/> <property name="indexType"> <util:constant static-field="org.apache.ignite.cache.QueryIndexType.SORTED"/> </property> <property name="fields"> <map> <entry key="srcuuid" value="true"/> <entry key="yxqtime" value="true"/> <entry key="zhuuid" value="true"/> </map> </property> </bean> </list> </property> </bean> </list> </property> </bean> <!-- Datasource for sample in-memory mysql database. --> <bean id="dataSource" class="org.apache.commons.dbcp.BasicDataSource" destroy-method="close"> <property name="driverClassName" value="com.mysql.jdbc.Driver"/> <property name="url" value="jdbc:mysql://localhost:3306/mishu?useUnicode=true&characterEncoding=UTF-8&zeroDateTimeBehavior=convertToNull"/> <property name="username" value="root"/> <property name="password" value="5201128blue"/> <!-- 连接池启动时的初始值 --> <property name="initialSize" value="10"/> <!-- 连接池的最大值 --> <property name="maxActive" value="100"/> <!-- 最大空闲值.当经过一个高峰时间后,连接池可以慢慢将已经用不到的连接慢慢释放一部分,一直减少到maxIdle为止 --> <property name="maxIdle" value="10"/> <!-- 最小空闲值.当空闲的连接数少于阀值时,连接池就会预申请去一些连接,以免洪峰来时来不及申请 --> <property name="minIdle" value="5"/> <property name="removeAbandoned" value="true"/> <property name="removeAbandonedTimeout" value="10"/> <property name="testWhileIdle"> <value>true</value> </property> <property name="testOnBorrow"> <value>true</value> </property> <property name="testOnReturn"> <value>true</value> </property> <property name="validationQuery"> <value>SELECT 1</value> </property> <property name="validationQueryTimeout"> <value>1</value> </property> <property name="timeBetweenEvictionRunsMillis"> <value>3600000</value> </property> <property name="minEvictableIdleTimeMillis"> <value>60000</value> </property> <property name="numTestsPerEvictionRun"> <value>5</value> </property> </bean> </beans>
package com.ignite; import org.apache.ignite.Ignite; import org.apache.ignite.IgniteCache; import org.apache.ignite.Ignition; import org.apache.ignite.cache.CacheMode; import org.apache.ignite.cache.QueryEntity; import org.apache.ignite.cache.store.jdbc.CacheJdbcPojoStoreFactory; import org.apache.ignite.cache.store.jdbc.JdbcType; import org.apache.ignite.configuration.CacheConfiguration; import org.apache.ignite.configuration.IgniteConfiguration; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Qualifier; import org.springframework.stereotype.Service; import java.util.ArrayList; import java.util.HashMap; import java.util.List; import java.util.Map; //这个类根据JdbcType和QueryEntity创建缓存 public class IgniteCacheFactory { public IgniteCacheFactory(TypesMap typesMap,QueryEntitiesMap queryEntitiesMap,IgniteConfiguration conf,CacheMode cacheMode){ init(typesMap.getList(),queryEntitiesMap,conf,cacheMode); } public void init(List<JdbcType> list,QueryEntitiesMap queryEntitiesMap,IgniteConfiguration conf,CacheMode cacheMode) { CacheConfiguration[] cacheConflist = new CacheConfiguration[list.size()]; for(int i=0;i<list.size();i++){ JdbcType qe=list.get(i); String cacheName=qe.getDatabaseTable(); qe.setCacheName(cacheName); String valueType=qe.getValueType(); CacheConfiguration cfg=IgniteCacheConfigurationFactory.create(cacheName); CacheJdbcPojoStoreFactory sf= new CacheJdbcPojoStoreFactory(); sf.setDataSourceBean("dataSource"); sf.setTypes(qe); cfg.setCacheMode(cacheMode);//CacheMode.PARTITIONED cfg.setCacheStoreFactory(sf); cacheConflist[i]=cfg; List<QueryEntity> qlist=new ArrayList<>(); qlist.add(queryEntitiesMap.get(valueType)); cfg.setQueryEntities(qlist); } conf.setCacheConfiguration(cacheConflist); System.out.println("----------------->>------------------"); } }
调用任务代码,先加载,在执行分布式任务:
public static void main(String[] args) throws IgniteException{ //Ignition.setClientMode(true); Ignite ignite=Ignition.start("ignite2.xml"); IgniteCluster cluster = ignite.cluster(); // 启动之后调用,直接加载所有定的表数据进缓存 Object[] list= (Object[]) ignite.cacheNames().toArray(); for(int i=0;i<list.length;i++){ String cacheName=String.valueOf(list[i]); IgniteCache cahce= ignite.cache(cacheName); cahce.loadCache(null); } CreditCalculate c=new CreditCalculate(); c.setGc(3); c.setSc(5); c.setGb(new BigDecimal(20)); c.setSb(new BigDecimal(20)); c.setSqlstr("(SELECT.... THEN ...."); c.setAq("1"); c.setIdd(new BigDecimal(3)); c.setProjTy("房程"); c.setXydjDqq("xydjDqq"); IgniteCompute compute = ignite.compute(cluster.forRemotes()); // Execute task on the clustr and wait for its completion. List<String> cnt = compute.execute(CreditCalculateTask.class,c); //System.out.println(">>> Total number of characters in the phrase is '" + cnt.size() + "'."); //System.out.println(">>>"+ cnt.get(0)); System.out.println("----------------mapreduce end-------------------"); }
半夜醒来:会不会是因为我缓存是在任务外声明的,所以执行任务是引用都是同一台机器上的缓存
// Inject Ignite instance. @IgniteInstanceResource private Ignite ignite; IgniteCache certSrcCache = ignite.cache("cert_src");//******这里声明 @Override public List<ComputeJob> split(int gridSize, CreditCalculate arg) { List<ComputeJob> jobs = new ArrayList<>(); SqlQuery sql2 = new SqlQuery(CertSrc.class, "from ..."); QueryCursor<Cache.Entry<Long, CertSrc>> companyList = certSrcCache.query(sql2); ... for (Cache.Entry<Long, CertSrc> e : companyList) { jobs.add(new ComputeJobAdapter() { @Override public Object execute() { .... SqlFieldsQuery sql = new SqlFieldsQuery(sql1); try (QueryCursor<List<?>> cursor = certSrcCache.query(sql)) {//*******这里在ComputeJobAdapter内部引用
版权声明:本文内容由阿里云实名注册用户自发贡献,版权归原作者所有,阿里云开发者社区不拥有其著作权,亦不承担相应法律责任。具体规则请查看《阿里云开发者社区用户服务协议》和《阿里云开发者社区知识产权保护指引》。如果您发现本社区中有涉嫌抄袭的内容,填写侵权投诉表单进行举报,一经查实,本社区将立刻删除涉嫌侵权内容。