View Javadoc

1   /*    Open Source Java Caching Service
2   *    Copyright (C) 2002 Frank Karlstrøm
3   *    This library is free software; you can redistribute it and/or
4   *    modify it under the terms of the GNU Lesser General Public
5   *    License as published by the Free Software Foundation; either
6   *    version 2.1 of the License, or (at your option) any later version.
7   *
8   *    This library is distributed in the hope that it will be useful,
9   *    but WITHOUT ANY WARRANTY; without even the implied warranty of
10  *    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
11  *    Lesser General Public License for more details.
12  *
13  *    You should have received a copy of the GNU Lesser General Public
14  *    License along with this library; if not, write to the Free Software
15  *    Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA
16  *
17  *    The author can be contacted by email: fjankk@users.sourceforge.net
18  */
19  package org.fjank.jcache.distribution;
20  
21  import java.util.Enumeration;
22  import org.fjank.jcache.CacheImpl;
23  import org.jgroups.Address;
24  import org.jgroups.ChannelClosedException;
25  import org.jgroups.ChannelException;
26  import org.jgroups.ChannelNotConnectedException;
27  import org.jgroups.JChannel;
28  import org.jgroups.Membership;
29  import org.jgroups.MembershipListener;
30  import org.jgroups.Message;
31  import org.jgroups.MessageListener;
32  import org.jgroups.View;
33  import org.jgroups.blocks.PullPushAdapter;
34  
35  public class JGroupsDistributionEngine
36      extends DistributionEngine
37      implements MessageListener, MembershipListener
38  {
39      private static JGroupsDistributionEngine _singleton;
40  
41      public static synchronized JGroupsDistributionEngine instanceOf(CacheImpl cache)
42      {
43          if (_singleton == null)
44          {
45              _singleton = new JGroupsDistributionEngine(cache);
46          }
47          return _singleton;
48      }
49  
50      private JChannel channel;
51  
52      private Membership members = new Membership();
53      
54  
55      private JGroupsDistributionEngine(CacheImpl cache)
56          
57      {
58          this.cache = cache;
59          if (cache.getAttributes().isDistributed())
60          {
61              try
62              {
63                  channel = new JChannel(null);
64                  channel.connect("FKacheOS");
65                  new PullPushAdapter(channel, this, this);
66              }
67              catch (ChannelException e)
68              {
69                  throw new IllegalStateException(e.getMessage());
70              }
71          }
72  
73      }
74  
75      public void block()
76      {
77      }
78  
79      /**
80       *
81       */
82      public Enumeration getCacheAddr()
83      {
84          return members.getMembers().elements();
85      }
86  
87      protected JChannel getChannel()
88      {
89          return channel;
90      }
91  
92      /**
93       * state not currently in use.
94       */
95      public byte[] getState()
96      {
97          return new byte[0];
98      }
99  
100     public void receive(Message msg)
101     {
102         // Only process messages coming from other members in the group
103         if (msg.getObject() instanceof ClusterNotification
104             && !msg.getSrc().equals(channel.getLocalAddress()))
105         {
106             ClusterNotification clusterNotification =
107                 (ClusterNotification) msg.getObject();
108             handleClusterNotification(clusterNotification);
109         }
110     }
111 
112     public void sendNotification(ClusterNotification clusterNotification)
113        
114     {
115         if (cache.getAttributes().isDistributed())
116         {
117             Message message = new Message();
118             message.setObject(clusterNotification);
119             try
120             {
121                 channel.send(message);
122             }
123             catch (ChannelNotConnectedException e)
124             {
125                 throw new IllegalStateException(e.getMessage());
126             }
127             catch (ChannelClosedException e)
128             {
129                 throw new IllegalStateException(e.getMessage());
130             }
131         }
132     }
133 
134     /**
135      * state not currently in use.
136      */
137     public void setState(byte[] state)
138     {
139 
140     }
141 
142     public void suspect(Address suspected_mbr)
143     {
144     }
145 
146     /** Is called when new members arrives or leaves the group.
147      * @see org.jgroups.MembershipListener#viewAccepted(org.jgroups.View)
148      */
149     public void viewAccepted(View new_view)
150     {
151         members.add(new_view.getMembers());
152     }
153 }