|
package dataload;
import com.tangosol.net.CacheFactory; import com.tangosol.net.InvocationService; import com.tangosol.net.Member; import com.tangosol.net.NamedCache; import com.tangosol.net.PartitionedService; import com.tangosol.util.InvocableMap;
import java.sql.Connection; import java.sql.PreparedStatement; import java.sql.ResultSet;
import java.sql.SQLException;
import java.sql.Statement;
import java.util.ArrayList; import java.util.Collections; import java.util.HashMap; import java.util.Hashtable; import java.util.Iterator; import java.util.List; import java.util.Map; import java.util.Set;
import javax.naming.Context; import javax.naming.InitialContext;
public class LoaderUsingEP { private Connection m_con; public Connection getConnection() { try { Context ctx = null; Hashtable<String,String> ht = new Hashtable<String,String>(); ht.put(Context.INITIAL_CONTEXT_FACTORY,"weblogic.jndi.WLInitialContextFactory"); ht.put(Context.PROVIDER_URL,"t3://localhost:7001"); ctx = new InitialContext(ht); javax.sql.DataSource ds= (javax.sql.DataSource) ctx.lookup("ds"); m_con = ds.getConnection(); } catch (Exception e) { System.out.println(e.getMessage()); } return m_con; }
protected Set getStorageMembers(NamedCache cache) { return ((PartitionedService) cache.getCacheService()) .getOwnershipEnabledMembers(); } protected Map<Member, List<String>> divideWork(Set members) { Iterator i = members.iterator(); Map<Member, List<String>> mapWork = new HashMap(members.size()); try { String sql = "select count(*) from persons"; int totalcount = 0; int membercount = members.size(); Connection con = getConnection(); Statement st = con.createStatement(); ResultSet rs = st.executeQuery(sql); while (rs.next()) totalcount = Integer.parseInt(rs.getString(1)); int onecount = totalcount / membercount; int lastcount = totalcount % membercount; sql = "select id from persons"; ResultSet rs1 = st.executeQuery(sql); int count = 0; int currentworker=0; ArrayList<String> list=new ArrayList<String>(); while (rs1.next()) { if (count < onecount) { list.add(rs1.getString("id")); count++; } else { Member member = (Member) i.next(); ArrayList<String> list2=new ArrayList<String>(); list2.addAll(list); mapWork.put(member, list2); list.clear(); /* print the list value for(Map.Entry<Member, List<String>> entry:mapWork.entrySet()){ System.out.println("first="+entry.getKey()); List<String> memberKeys = entry.getValue(); for(int j = 0; j < memberKeys.size(); j++) { System.out.println("firsttime="+memberKeys.get(j)); //System.out.println(list.get(i)); } } */ count=0; list.add(rs1.getString("id")); count++; currentworker ++; if (currentworker == membercount-1) { onecount = onecount+lastcount; }
} } Member member = (Member) i.next(); mapWork.put(member, list); st.close(); con.close(); } catch (Exception e) { System.out.println("Exception...."+e.getMessage()); } for(Map.Entry<Member, List<String>> entry:mapWork.entrySet()){ System.out.println("final="+entry.getKey()); List<String> memberKeys = entry.getValue(); for(int j = 0; j < memberKeys.size(); j++) { System.out.println(memberKeys.get(j)); } } return mapWork; } public void load() { NamedCache cache = CacheFactory.getCache("SampleCache");
Set members = getStorageMembers(cache); System.out.println("members"+members.size());
Map<Member, List<String>> mapWork = divideWork(members);
InvocationService service = (InvocationService) CacheFactory.getService("LocalInvocationService"); for (Map.Entry<Member, List<String>> entry : mapWork.entrySet()) { Member member = entry.getKey(); List<String> memberKeys = entry.getValue(); System.out.println(memberKeys.size()); MyLoadInvocable task = new MyLoadInvocable(memberKeys, cache.getCacheName()); service.execute(task, Collections.singleton(member), null); } } public static void main(String[] args) { LoaderUsingEP ep = new LoaderUsingEP(); ep.load(); } }
|