1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19 package org.fjank.jcache.distribution;
20
21 import java.util.Enumeration;
22 import org.fjank.jcache.CacheImpl;
23 import org.jgroups.Address;
24 import org.jgroups.ChannelClosedException;
25 import org.jgroups.ChannelException;
26 import org.jgroups.ChannelNotConnectedException;
27 import org.jgroups.JChannel;
28 import org.jgroups.Membership;
29 import org.jgroups.MembershipListener;
30 import org.jgroups.Message;
31 import org.jgroups.MessageListener;
32 import org.jgroups.View;
33 import org.jgroups.blocks.PullPushAdapter;
34
35 public class JGroupsDistributionEngine
36 extends DistributionEngine
37 implements MessageListener, MembershipListener
38 {
39 private static JGroupsDistributionEngine _singleton;
40
41 public static synchronized JGroupsDistributionEngine instanceOf(CacheImpl cache)
42 {
43 if (_singleton == null)
44 {
45 _singleton = new JGroupsDistributionEngine(cache);
46 }
47 return _singleton;
48 }
49
50 private JChannel channel;
51
52 private Membership members = new Membership();
53
54
55 private JGroupsDistributionEngine(CacheImpl cache)
56
57 {
58 this.cache = cache;
59 if (cache.getAttributes().isDistributed())
60 {
61 try
62 {
63 channel = new JChannel(null);
64 channel.connect("FKacheOS");
65 new PullPushAdapter(channel, this, this);
66 }
67 catch (ChannelException e)
68 {
69 throw new IllegalStateException(e.getMessage());
70 }
71 }
72
73 }
74
75 public void block()
76 {
77 }
78
79
80
81
82 public Enumeration getCacheAddr()
83 {
84 return members.getMembers().elements();
85 }
86
87 protected JChannel getChannel()
88 {
89 return channel;
90 }
91
92
93
94
95 public byte[] getState()
96 {
97 return new byte[0];
98 }
99
100 public void receive(Message msg)
101 {
102
103 if (msg.getObject() instanceof ClusterNotification
104 && !msg.getSrc().equals(channel.getLocalAddress()))
105 {
106 ClusterNotification clusterNotification =
107 (ClusterNotification) msg.getObject();
108 handleClusterNotification(clusterNotification);
109 }
110 }
111
112 public void sendNotification(ClusterNotification clusterNotification)
113
114 {
115 if (cache.getAttributes().isDistributed())
116 {
117 Message message = new Message();
118 message.setObject(clusterNotification);
119 try
120 {
121 channel.send(message);
122 }
123 catch (ChannelNotConnectedException e)
124 {
125 throw new IllegalStateException(e.getMessage());
126 }
127 catch (ChannelClosedException e)
128 {
129 throw new IllegalStateException(e.getMessage());
130 }
131 }
132 }
133
134
135
136
137 public void setState(byte[] state)
138 {
139
140 }
141
142 public void suspect(Address suspected_mbr)
143 {
144 }
145
146
147
148
149 public void viewAccepted(View new_view)
150 {
151 members.add(new_view.getMembers());
152 }
153 }