import org.apache.catalina.connector.Connector;
-import java.net.MulticastSocket;
-import java.net.InetAddress;
-import java.net.DatagramPacket;
-import java.io.UnsupportedEncodingException;
-
import org.apache.tomcat.util.modeler.Registry;
/*
public void setTtl(int ttl) { this.ttl = ttl; }
public int getTtl() { return ttl; }
+ /**
+ * Proxy list, format "address:port,address:port".
+ */
+ protected String proxyList = null;
+ public String getProxyList() { return proxyList; }
+ public void setProxyList(String proxyList) { this.proxyList = proxyList; }
+
+ /**
+ * URL prefix.
+ */
+ protected String proxyURL = "/HeartbeatListener";
+ public String getProxyURL() { return proxyURL; }
+ public void setProxyURL(String proxyURL) { this.proxyURL = proxyURL; }
+
private CollectedInfo coll = null;
private Sender sender = null;
Object source = event.getLifecycle();
if (Lifecycle.PERIODIC_EVENT.equals(event.getType())) {
if (sender == null) {
- sender = new MultiCastSender();
- sender.init(this);
+ if (proxyList == null)
+ sender = new MultiCastSender();
+ else
+ sender = new TcpSender();
+
+ try {
+ sender.init(this);
+ } catch (Exception ex) {
+ log.error("Unable to initialize Sender: " + ex);
+ sender = null;
+ return;
+ }
}
/* Read busy and ready */
--- /dev/null
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+
+package org.apache.catalina.ha.backend;
+
+import java.net.InetAddress;
+
+/*
+ * This class represents a front-end httpd server.
+ *
+ */
+public class Proxy {
+
+ protected enum State { OK, ERROR, DOWN };
+
+ public InetAddress address = null;
+ public int port = 80;
+ public State state = State.OK;
+}
--- /dev/null
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+
+package org.apache.catalina.ha.backend;
+
+import org.apache.juli.logging.Log;
+import org.apache.juli.logging.LogFactory;
+
+import java.io.BufferedReader;
+import java.io.BufferedWriter;
+import java.io.IOException;
+import java.io.InputStreamReader;
+import java.io.OutputStreamWriter;
+import java.net.InetAddress;
+import java.net.Socket;
+import java.net.InetAddress;
+import java.io.UnsupportedEncodingException;
+import java.util.StringTokenizer;
+
+/*
+ * Sender to proxies using multicast socket.
+ */
+public class TcpSender
+ implements Sender {
+
+ private static Log log = LogFactory.getLog(HeartbeatListener.class);
+
+ HeartbeatListener config = null;
+
+ /**
+ * Proxies.
+ */
+ protected Proxy[] proxies = null;
+
+
+ /**
+ * Active connections.
+ */
+
+ protected Socket[] connections = null;
+ protected BufferedReader[] connectionReaders = null;
+ protected BufferedWriter[] connectionWriters = null;
+
+
+ public void init(HeartbeatListener config) throws Exception {
+ this.config = config;
+ StringTokenizer tok = new StringTokenizer(config.getProxyList(), ",");
+ proxies = new Proxy[tok.countTokens()];
+ int i = 0;
+ while (tok.hasMoreTokens()) {
+ String token = tok.nextToken().trim();
+ int pos = token.indexOf(':');
+ if (pos <=0)
+ throw new Exception("bad ProxyList");
+ proxies[i] = new Proxy();
+ proxies[i].port = Integer.parseInt(token.substring(pos + 1));
+ try {
+ proxies[i].address = InetAddress.getByName(token.substring(0, pos));
+ } catch (Exception e) {
+ throw new Exception("bad ProxyList");
+ }
+ i++;
+ }
+ connections = new Socket[proxies.length];
+ connectionReaders = new BufferedReader[proxies.length];
+ connectionWriters = new BufferedWriter[proxies.length];
+
+ }
+
+ public int send(String mess) throws Exception {
+ if (connections == null) {
+ log.error("Not initialized");
+ return -1;
+ }
+ String requestLine = "POST " + config.getProxyURL() + " HTTP/1.0";
+
+ for (int i = 0; i < connections.length; i++) {
+ if (connections[i] == null) {
+ try {
+ connections[i] = new Socket(proxies[i].address, proxies[i].port);
+ connectionReaders[i] = new BufferedReader(new InputStreamReader(connections[i].getInputStream()));
+ connectionWriters[i] = new BufferedWriter(new OutputStreamWriter(connections[i].getOutputStream()));
+ } catch (Exception ex) {
+ log.error("Unable to connect to proxy: " + ex);
+ close(i);
+ }
+ }
+ if (connections[i] == null)
+ continue; // try next proxy in the list
+ BufferedWriter writer = connectionWriters[i];
+ try {
+ writer.write(requestLine);
+ writer.write("\r\n");
+ writer.write("Content-Length: " + mess.length() + "\r\n");
+ writer.write("User-Agent: HeartbeatListener/1.0\r\n");
+ writer.write("Connection: Keep-Alive\r\n");
+ writer.write("\r\n");
+ writer.write(mess);
+ writer.write("\r\n");
+ writer.flush();
+ } catch (Exception ex) {
+ log.error("Unable to send collected load information to proxy: " + ex);
+ close(i);
+ }
+ if (connections[i] == null)
+ continue; // try next proxy in the list
+
+ /* Read httpd answer */
+ String responseStatus = connectionReaders[i].readLine();
+ if (responseStatus == null) {
+ log.error("Unable to read response from proxy");
+ close(i);
+ continue;
+ } else {
+ responseStatus = responseStatus.substring(responseStatus.indexOf(' ') + 1, responseStatus.indexOf(' ', responseStatus.indexOf(' ') + 1));
+ int status = Integer.parseInt(responseStatus);
+ if (status != 200) {
+ log.error("Status is " + status);
+ close(i);
+ continue;
+ }
+
+ // read all the headers.
+ String header = connectionReaders[i].readLine();
+ int contentLength = 0;
+ while (!"".equals(header)) {
+ int colon = header.indexOf(':');
+ String headerName = header.substring(0, colon).trim();
+ String headerValue = header.substring(colon + 1).trim();
+ if ("content-length".equalsIgnoreCase(headerName)) {
+ contentLength = Integer.parseInt(headerValue);
+ }
+ }
+ if (contentLength > 0) {
+ char[] buf = new char[512];
+ while (contentLength > 0) {
+ int thisTime = (contentLength > buf.length) ? buf.length : contentLength;
+ int n = connectionReaders[i].read(buf, 0, thisTime);
+ if (n <= 0) {
+ log.error("Read content failed");
+ close(i);
+ break;
+ } else {
+ contentLength -= n;
+ }
+ }
+ }
+ }
+
+ }
+
+ return 0;
+ }
+
+ /**
+ * Close connection.
+ */
+ protected void close(int i) {
+ try {
+ if (connectionReaders[i] != null) {
+ connectionReaders[i].close();
+ }
+ } catch (IOException e) {
+ }
+ connectionReaders[i] = null;
+ try {
+ if (connectionWriters[i] != null) {
+ connectionWriters[i].close();
+ }
+ } catch (IOException e) {
+ }
+ connectionWriters[i] = null;
+ try {
+ if (connections[i] != null) {
+ connections[i].close();
+ }
+ } catch (IOException e) {
+ }
+ connections[i] = null;
+ }
+}