/* FileQueue.java */
package com.stimulus.util;
/*
Copyright (c) 2005-2011 Jamie Band
GNU Lesser General Public License
*/
import java.io.BufferedReader;
import java.io.BufferedWriter;
import java.io.File;
import java.io.FileNotFoundException;
import java.io.FileReader;
import java.io.FileWriter;
import java.io.IOException;
import java.io.RandomAccessFile;
import java.nio.channels.FileChannel;
import java.nio.channels.FileLock;
import java.nio.channels.OverlappingFileLockException;
import java.util.ArrayList;
import java.util.Calendar;
import java.util.Date;
import java.util.NoSuchElementException;
import java.util.Queue;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.locks.ReentrantLock;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
public abstract class FileQueue implements Runnable {
public enum ProcessResult { PROCESS_SUCCESS, PROCESS_FAIL_REQUEUE, PROCESS_FAIL_NOQUEUE };
protected final Log logger = LogFactory.getLog(FileQueue .class);
protected String name;
protected int processThreads = 6;
protected int maxRetryDays = 7;
protected int maxRetries = 7;
protected int retryIntervalMinutes = 1;
protected ExecutorService processPool = null;
protected static int DEAD_PERIOD = 6000000;
protected static int MAX_NO_DELETES_BEFORE_CLEANUP = 100;
protected static int WRITE_QUEUE_SIZE = 500;
protected AtomicInteger processCount = new AtomicInteger();
protected ArrayBlockingQueue writeQueue;
protected Queue writeBackQueue;
protected ShutdownHook shutdownHook;
protected AtomicBoolean started = new AtomicBoolean();
protected long period;
protected TimeUnit timeunit;
protected int maxItemsPerCycle = 1000;
protected RandomAccessFile rndFile;
protected FileChannel rndFileChannel;
protected FileLock rndFileLock;
protected ExecuteInfo execInfo = new ExecuteInfo();
protected AtomicBoolean queueFileOpened = new AtomicBoolean();
protected ReentrantLock removeLock = new ReentrantLock();
public FileQueue(String name) {
this.name = name;
writeQueue = new ArrayBlockingQueue(WRITE_QUEUE_SIZE);
writeBackQueue = new ConcurrentLinkedQueue();
shutdownHook = new ShutdownHook();
started.getAndSet(false);
queueFileOpened.getAndSet(false);
}
public void startQueueProcessor( long period, TimeUnit timeunit) {
if (started.compareAndSet(false,true)) {
this.period = period;
this.timeunit = timeunit;
logger.debug("start queue processor {name='"+name+"',file='"+getQueueFile()+"'}");
try {
execInfo = resetCounters();
} catch (Exception e) {
logger.error("failed to set queue counters:"+e.getMessage(),e);
}
openQueue();
processPool = Executors.newFixedThreadPool(processThreads+1);
processPool.execute(this);
Runtime.getRuntime().addShutdownHook(shutdownHook);
} else {
logger.debug("queue processor already started {name='"+name+"',file='"+getQueueFile()+"'}");
}
}
public void stopQueueProcessor() {
if (started.compareAndSet(true,false)) {
logger.debug("stop queue processor {writeQueue='"+writeQueue.size()+"',name='"+name+"',file='"+getQueueFile()+"'}");
if (processPool!=null) {
processPool.shutdown();
}
if (shutdownHook!=null) {
Runtime.getRuntime().removeShutdownHook(shutdownHook);
}
try {
processQueueItems();
} catch (Exception e) {
logger.error("failed to process remaining queue items during shutdown:"+e.getMessage(),e);
}
logger.debug("queue processor stopped {writeQueue='"+writeQueue.size()+"',name='"+name+"',file='"+getQueueFile()+"'}");
} else {
logger.debug("queue processor already stopped {writeQueue='"+writeQueue.size()+"',name='"+name+"',file='"+getQueueFile()+"'}");
}
}
public void run() {
try {
processQueueItems();
} catch (Throwable e) {
logger.error("failed to process file queue:"+e.getMessage(),e);
}
}
public void openQueue() throws OverlappingFileLockException {
try {
ensureQueueFilePresent();
removeProcessedItemsFromQueue();
openQueueFile();
try {
activateAllQueueItems(rndFile);
} finally {
try {
closeQueueFile();
} catch (Throwable t) {
logger.error("error occurred while closing queue:"+t.getMessage(),t);
}
}
} catch (Throwable t) {
logger.error("error occurred while opening queue:"+t.getMessage(),t);
}
}
public void processQueueItems() throws FileQueueException {
while (started.get()) {
try {
try {
ProcessItem head = null;
try {
head = writeQueue.poll(period,timeunit);
} catch (InterruptedException nse) {
continue;
}
if (head==null && getQueueFile().length()==0) {
continue;
}
openQueueFile();
if (execInfo.delCount.get()<0 || execInfo.actCount.get()<0 || execInfo.procCount.get()<0) {
execInfo = resetCounters();
}
if (head==null && execInfo.delCount.get()>0 && execInfo.actCount.get()<=0 && execInfo.procCount.get()<=0 && writeBackQueue.size()==0) {
removeProcessedItemsFromQueue();
continue;
}
if (head!=null) {
rndFile.seek(rndFile.length());
writeAllQueuedItems((Queue)writeQueue, rndFile, head);
}
rndFile.seek(0);
executeAllQueueItems(rndFile);
if (writeBackQueue.size()>0) {
writeAllQueuedItems((Queue)writeBackQueue, rndFile, null);
continue;
}
if (writeBackQueue.size()==0 && execInfo.procCount.get()<=0 && execInfo.delCount.get()>=MAX_NO_DELETES_BEFORE_CLEANUP) {
removeProcessedItemsFromQueue();
}
} catch (Exception e) {
throw new FileQueueException("exception occured: "+e.getMessage(),e,logger);
} finally {
closeQueueFile();
}
} catch (Throwable t) {
logger.error("error occurred during file queue process loop:"+t.getMessage(),t);
}
}
}
public void queueItem(FileQueueItem fileQueueItem) throws FileQueueException {
if (fileQueueItem == null) {
throw new FileQueueException("precondition: attempt to insert null item to queue.");
}
if (fileQueueItem.getValue() == null) {
throw new FileQueueException("precondition: attempt to insert item with null content in queue.");
}
execInfo.actCount.getAndIncrement();
try {
writeQueue.put(new ProcessItem(fileQueueItem,-1));
} catch (InterruptedException ie) {
throw new FileQueueException("failed to write item to cache. interrupted:"+ie.getMessage(),ie,logger);
}
}
public void writeItems(ArrayList items) throws FileQueueException {
if (items == null) {
throw new FileQueueException("precondition: items = null");
}
FileWriter fw = null;
BufferedWriter bw = null;
File queueFile = getQueueFile();
try {
fw = new FileWriter(queueFile, true);
bw = new BufferedWriter(fw);
for (FileQueueItem item : items) {
if (item==null || item.getValue()==null ) {
continue;
}
bw.write(item.toString());
bw.newLine();
}
} catch (IOException io) {
throw new FileQueueException("failed to write to queue:"+io.getMessage(),io,logger);
} finally {
if (bw != null) {
try { bw.close(); } catch (IOException e) { logger.debug("failed close buffer: "+e.getMessage()); }
try { fw.close(); } catch (IOException e) { logger.debug("failed close file: "+e.getMessage()); }
}
}
}
public boolean isExpired(FileQueueItem fileQueueItem) throws FileQueueException {
if (fileQueueItem == null) {
throw new FileQueueException("precondition: fileQueueItem = null");
}
//logger.debug("isexpired {item='"+fileQueueItem.getValue()+"'");
if (maxRetries != 0 && fileQueueItem.retries > maxRetries) {
return true;
}
if (maxRetryDays > 0) {
Date receivedDate = fileQueueItem.getReceivedDate();
Calendar deleteCalendar = Calendar.getInstance();
deleteCalendar.setTime(receivedDate);
deleteCalendar.add(Calendar.DATE, + maxRetryDays);
Date deleteDate = deleteCalendar.getTime();
return new Date().after(deleteDate);
}
return false;
}
public boolean shouldRetry(FileQueueItem fileQueueItem) {
if (fileQueueItem.getStatus()!=FileQueueItem.Status.ACTIVE) {
return false;
}
Date lastRetryDate = fileQueueItem.getLastRetryDate();
Calendar retryCalendar = Calendar.getInstance();
retryCalendar.setTime(lastRetryDate);
retryCalendar.add(Calendar.SECOND, + retryIntervalMinutes*60);
Date retryDate = retryCalendar.getTime();
if (new Date().after(retryDate)) {
logger.debug("try {item='"+fileQueueItem.getValue()+",retryDate='"+retryDate+"',result='yes'}");
return true;
} else {
//logger.debug("shouldretry {item='"+fileQueueItem.getValue()+",retryDate='"+retryDate+"',result='no'}");
return false;
}
}
public void removeProcessedItemsFromQueue() throws IOException {
BufferedWriter bw = null;
BufferedReader in = null;
File queueFile = getQueueFile();
File queueBackupFile = getQueueBackupFile();
logger.debug("remove processed items {name='"+name+"',file='"+getQueueFile()+"'}");
try {
queueBackupFile.delete();
if (!getQueueFile().exists() || getQueueFile().length()<1) {
logger.debug("skipping removal of queue items. queue is empty.");
return;
}
FileReader inReader = new FileReader(queueFile);
in = new BufferedReader(inReader);
bw = new BufferedWriter(new FileWriter(queueBackupFile, true));
String line;
while ((line = in.readLine()) != null) {
if (line.length()<=0) continue;
if (!started.get()) {
logger.debug("file queue "+name+" shutdown. exiting cleanly..");
break;
}
FileQueueItem fileQueueItem = newFileQueueItem();
try {
fileQueueItem.parse(line);
} catch (FileQueueException e) {
logger.debug("failed to parse line in queue "+name+":"+e.getMessage(),e);
logger.debug(">line:"+line);
continue;
}
if (fileQueueItem.getValue()==null) {
continue;
}
if (fileQueueItem.getStatus() != FileQueueItem.Status.DELETED) {
line = fileQueueItem.toString();
bw.write(line);
bw.newLine();
}
}
} finally {
if (bw!=null) { try { bw.close(); } catch (Exception e) {} }
if (in!=null) { try { in.close(); } catch (Exception e) {} }
}
boolean deleted = queueFile.delete();
if (!deleted) {
logger.error("failed to delete queue file {file='"+queueFile.getAbsolutePath()+"'}");
}
queueBackupFile.renameTo(queueFile);
execInfo.delCount.getAndSet(0);
}
public void changeFileQueueStatus(FileQueueItem fileQueueItem, FileQueueItem.Status newStatus) {
if (fileQueueItem.getStatus()==newStatus)
return;
switch (fileQueueItem.getStatus()) {
case PROCESSING: execInfo.procCount.getAndDecrement(); break;
case ACTIVE: execInfo.actCount.getAndDecrement();break;
case DELETED: execInfo.delCount.getAndDecrement();break;
}
switch (newStatus) {
case PROCESSING: execInfo.procCount.getAndIncrement();break;
case ACTIVE: execInfo.actCount.getAndIncrement();break;
case DELETED: execInfo.delCount.getAndIncrement();break;
}
fileQueueItem.setStatus(newStatus);
}
public class ProcessItem implements Runnable {
FileQueueItem fileQueueItem;
long pointer;
public ProcessItem(FileQueueItem fileQueueItem,long pointer) {
this.fileQueueItem = fileQueueItem;
this.pointer = pointer;
}
public void run() {
try {
if (fileQueueItem==null) {
return;
}
logger.debug("process item {item='"+fileQueueItem.getValue()+"'}");
if (processFileQueueItem(fileQueueItem)==ProcessResult.PROCESS_FAIL_REQUEUE) {
fileQueueItem.incRetries();
changeFileQueueStatus(fileQueueItem,FileQueueItem.Status.ACTIVE);
} else {
changeFileQueueStatus(fileQueueItem,FileQueueItem.Status.DELETED);
}
if (fileQueueItem.getStatus()!=FileQueueItem.Status.DELETED && isExpired(fileQueueItem)) {
expiredItem(fileQueueItem);
}
writeBackQueue.add(this);
} catch (Throwable t) {
logger.debug("failed to process item:"+t.getMessage()+" {value='"+fileQueueItem.getValue()+"'}",t);
}
}
}
private void ensureQueueFilePresent() throws FileQueueException {
if (!getQueueFile().exists()) {
try {
getQueueFile().createNewFile();
} catch (IOException io) {
throw new FileQueueException("failed to create queue file: "+io.getMessage()+
" {queueFile='"+getQueueFile().getPath()+"'}",io,logger);
}
}
}
private void writeAllQueuedItems(Queue queue, RandomAccessFile rndFile, ProcessItem head) throws IOException {
if (rndFile==null) {
throw new IOException("file handle is null");
}
ProcessItem writeItem = null;
long oldpos = rndFile.getFilePointer();
int i = 0;
do {
if (head!=null) {
writeItem = head;
i++;
head = null;
} else {
try {
if (i>maxItemsPerCycle) {
break;
}
writeItem = queue.poll();
if (writeItem == null) break;
i++;
} catch (NoSuchElementException nse) {
break;
}
}
long writePos = 0;
if (writeItem.pointer == -1) {
writePos = rndFile.length();
} else {
writePos = writeItem.pointer;
}
try {
rndFile.seek(writePos);
} catch (IOException io) {
logger.debug("failed to seek in queue file "+getQueueFile()+": "+io.getMessage(),io);
}
try {
rndFile.writeBytes(writeItem.fileQueueItem.toString());
rndFile.writeBytes("\n");
} catch (IOException io) {
logger.error("failed to write "+getQueueFile()+": "+io.getMessage(),io);
}
if (writeItem.pointer != -1) {
processCount.decrementAndGet();
// logger.debug("decprocesscount:"+processCount);
}
} while (i<=maxItemsPerCycle && writeItem!=null);
rndFile.seek(oldpos);
}
protected class ExecuteInfo {
AtomicInteger executed = new AtomicInteger();
AtomicInteger actCount = new AtomicInteger();
AtomicInteger procCount = new AtomicInteger();
AtomicInteger delCount = new AtomicInteger();
}
protected void activateAllQueueItems(RandomAccessFile rndFile) throws IOException {
if (rndFile==null) {
throw new IOException("precondition: rndfile is null");
}
long lastPos = 0;
String line;
while ((line = rndFile.readLine()) != null) {
//logger.debug(">"+line);
if (line.length()<=0) continue;
if (!started.get()) {
logger.debug("file queue "+name+" shutdown. exiting cleanly..");
break;
}
if (line.trim().length()<1) continue;
FileQueueItem fileQueueItem = newFileQueueItem();
try {
fileQueueItem.parse(line);
} catch (FileQueueException ae) {
logger.debug("failed to process item:"+ae.getMessage(),ae);
lastPos = rndFile.getFilePointer();
continue;
}
if (fileQueueItem.getStatus() == FileQueueItem.Status.DELETED || fileQueueItem.getValue()==null) {
lastPos = rndFile.getFilePointer();
continue;
}
try {
rndFile.seek(lastPos);
} catch (IOException io) {
logger.debug("failed to seek in queue file "+getQueueFile()+": "+io.getMessage(),io);
}
try {
changeFileQueueStatus(fileQueueItem,FileQueueItem.Status.ACTIVE);
rndFile.writeBytes(fileQueueItem.toString());
rndFile.writeBytes("\n");
} catch (IOException io) {
logger.error("failed to write "+getQueueFile()+": "+io.getMessage(),io);
}
lastPos = rndFile.getFilePointer();
}
}
protected void executeAllQueueItems(RandomAccessFile rndFile) throws IOException {
try {
if (rndFile==null) {
throw new IOException("precondition: rndfile is null");
}
long lastPos = rndFile.getFilePointer();
String line;
while ((line = rndFile.readLine()) != null) {
if (line.trim().length()<=0) {
lastPos = rndFile.getFilePointer();
continue;
}
//logger.debug(">"+line);
if (!started.get()) {
logger.debug("file queue "+name+" shutdown. exiting cleanly..");
break;
}
FileQueueItem fileQueueItem = newFileQueueItem();
try {
fileQueueItem.parse(line);
if (fileQueueItem.getValue()==null || fileQueueItem.getStatus() == FileQueueItem.Status.DELETED) {
lastPos = rndFile.getFilePointer();
continue;
}
} catch (FileQueueException ae) {
try {
rndFile.seek(lastPos);
} catch (IOException io) {
logger.debug("failed to seek in queue file "+getQueueFile()+": "+io.getMessage(),io);
}
try {
StringBuffer s = new StringBuffer();
for (int i=0;i s.append(' ');
}
rndFile.writeBytes(s.toString());
rndFile.writeBytes("\n");
} catch (IOException io) {
logger.error("failed to write "+getQueueFile()+": "+io.getMessage(),io);
}
lastPos = rndFile.getFilePointer();
continue;
}
if (shouldRetry(fileQueueItem)) {
fileQueueItem.setLastRetryDate(new Date());
changeFileQueueStatus(fileQueueItem,FileQueueItem.Status.PROCESSING);
//logger.debug("incprocesscount:"+processCount);
processCount.getAndIncrement();
try {
rndFile.seek(lastPos);
} catch (IOException io) {
logger.debug("failed to seek in queue file "+getQueueFile()+": "+io.getMessage(),io);
}
try {
rndFile.writeBytes(fileQueueItem.toString());
rndFile.writeBytes("\n");
} catch (IOException io) {
logger.error("failed to write "+getQueueFile()+": "+io.getMessage(),io);
}
ProcessItem pitem = new ProcessItem(fileQueueItem,lastPos);
//pitem.run();
processPool.execute(pitem);
}
lastPos = rndFile.getFilePointer();
}
} catch (Throwable t) {
logger.error(t);
}
}
protected void openQueueFile() throws FileQueueException {
if (queueFileOpened.compareAndSet(false,true)) {
try {
rndFile = new RandomAccessFile(getQueueFile(), "rw");
rndFileChannel = rndFile.getChannel();
rndFileLock = rndFileChannel.lock();
} catch (FileNotFoundException e2) {
throw new FileQueueException("failed to find queue file: "+e2.getMessage());
} catch (IOException io) {
try { rndFileLock.release(); } catch (Exception e) { logger.debug("failed to release file lock:"+e.getMessage());}
try { rndFileChannel.close(); } catch (Exception e) { logger.debug("failed to close file channel:"+e.getMessage());}
throw new FileQueueException("failed to obtain lock on queue file "+getQueueFile()+":"+io.getMessage());
}
}
}
protected void closeQueueFile() throws FileQueueException{
if (queueFileOpened.compareAndSet(true,false)) {
if (rndFile==null) {
throw new FileQueueException("precondition: rndfile is null");
}
Exception occurrance = null;
try {
if (rndFileLock!=null) {
rndFileLock.release();
}
} catch (Exception e) {
occurrance = e;
}
try {
if (rndFileChannel!=null) {
rndFileChannel.close();
}
} catch (Exception e) {
occurrance = e;
}
try {
if (rndFile != null) {
rndFile.close();
}
} catch (Exception e) {
occurrance = e;
}
if (occurrance!=null) {
throw new FileQueueException("failed to close queue file:"+occurrance.getMessage(),occurrance,logger);
}
}
}
public ExecuteInfo resetCounters() throws IOException {
BufferedReader in = null;
File queueFile = getQueueFile();
ExecuteInfo newExecInfo = new ExecuteInfo();
logger.debug("init queue counters {name='"+name+"',file='"+getQueueFile()+"'}");
try {
if (!getQueueFile().exists() || getQueueFile().length()<1) {
logger.debug("skipping queue counters. queue is empty.");
return newExecInfo;
}
FileReader inReader = new FileReader(queueFile);
in = new BufferedReader(inReader);
String line;
while ((line = in.readLine()) != null) {
if (line.length()<=0) continue;
if (!started.get()) {
logger.debug("file queue "+name+" shutdown. exiting cleanly..");
break;
}
FileQueueItem fileQueueItem = newFileQueueItem();
try {
fileQueueItem.parse(line);
} catch (FileQueueException e) {
logger.debug("failed to parse line in queue "+name+":"+e.getMessage(),e);
logger.debug(">line:"+line);
continue;
}
if (fileQueueItem.getValue()==null) {
continue;
}
switch (fileQueueItem.getStatus()) {
case DELETED: newExecInfo.delCount.getAndIncrement(); break;
case ACTIVE: newExecInfo.actCount.getAndIncrement(); break;
case PROCESSING: newExecInfo.procCount.getAndIncrement(); break;
}
}
} finally {
if (in!=null) { try { in.close(); } catch (Exception e) {} }
}
logger.debug("counter calcs complete");
return newExecInfo;
}
public class ShutdownHook extends Thread {
public void run() {
stopQueueProcessor();
}
}
public abstract File getQueueFile();
public abstract ProcessResult processFileQueueItem(FileQueueItem item);
public abstract void expiredItem(FileQueueItem item);
public abstract FileQueueItem newFileQueueItem();
public File getQueueBackupFile() {
return new File(getQueueFile().getAbsolutePath()+".bak");
}
public int getProcessThreads() { return processThreads; }
public int getMaxRetryDays() { return maxRetryDays; }
public int getMaxRetries() { return maxRetries; }
public int getRetryIntervalMinutes() { return retryIntervalMinutes; }
public void setProcessThreads(int processThreads) { this.processThreads = processThreads; }
public void setMaxRetryDays(int retryDays) { this.maxRetryDays = retryDays; }
public void setMaxRetries(int maxRetries) { this.maxRetries = maxRetries; }
public void setRetryIntervalMinutes(int retryIntervalMinutes) { this.retryIntervalMinutes = retryIntervalMinutes; }
protected ScheduledExecutorService scheduler;
protected ScheduledFuture> scheduledTask;
public int getActCount() { return execInfo.actCount.get(); }
public int getProcCount() { return execInfo.procCount.get(); }
public int getDelCount() { return execInfo.delCount.get(); }
}
/** FileQueueException.java **/
package com.stimulus.util;
import java.io.Serializable;
import org.apache.commons.logging.*;
public class FileQueueException extends Exception implements Serializable {
private static final long serialVersionUID = -8353177202907981954L;
public FileQueueException(String message, Log logger) {
super(message);
logger.error(message);
}
public FileQueueException(String message, Throwable cause, Log logger) {
super(message,cause);
logger.error(message);
}
public FileQueueException(String message) {
super(message);
}
public String getMessage() { return super.getMessage(); }
}
/** FileQueueItem.java */
package com.stimulus.util;
import java.text.DecimalFormat;
import java.util.Date;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
public abstract class FileQueueItem {
public enum Status { ACTIVE, DELETED, PROCESSING };
public long pointer = 0;
public Date receivedDate = null;
public Date lastRetryDate = null;
public int retries = 0;
protected static Log logger = LogFactory.getLog(FileQueueItem.class);
protected Status status = Status.ACTIVE;
public FileQueueItem() {
this.receivedDate = new Date();
this.lastRetryDate = new Date(0);
}
public abstract String getValue();
public abstract void setValue(String value) throws FileQueueException;
protected String getStatusStr() {
if (status==Status.ACTIVE) {
return "A";
} else if (status==Status.DELETED) {
return "D";
}
return "P";
}
protected Status parseStatus(String status) throws FileQueueException {
if (status.trim().equals("A")) {
return Status.ACTIVE;
} else if (status.trim().equals("D")) {
return Status.DELETED;
} else if (status.trim().equals("P")) {
return Status.PROCESSING;
}
throw new FileQueueException("failed to parse status {status='"+status+"'}");
}
public void parse(String line) throws FileQueueException {
String[] components = line.split("\\|");
if (components.length==5) {
String status = components[0];
String receiveDate = components[1];
String lastRetryDate = components[2];
String retries = components[3];
String value = components[4];
if (status==null || value==null || receiveDate==null || lastRetryDate==null || retries==null || value==null) {
throw new FileQueueException("null value found in file queue record parse");
}
// logger.debug("this is the value : "+value);
//if (de)
setStatus(parseStatus(status));//deleted.equals("D"));
try {
setReceivedDate(receiveDate);
} catch (Exception ae) {
throw new FileQueueException("failed to set received date:"+ae.getMessage(),ae,logger);
}
setRetries(Integer.parseInt(retries));
try {
setLastRetryDate(lastRetryDate);
} catch (Exception ae) {
throw new FileQueueException("failed to set last retry date:"+ae.getMessage(),ae,logger);
}
try {
setValue(value);
} catch ( FileQueueException ae) {
throw new FileQueueException("failed to set value:"+ae.getMessage(),ae,logger);
}
} else {
throw new FileQueueException("no match found. line:"+line,logger);
}
}
public String toString() {
return getStatusStr() +"|" + DateUtil.convertDatetoString(receivedDate,DateUtil.minuteFormatFast) + "|" +
DateUtil.convertDatetoString(lastRetryDate,DateUtil.minuteFormatFast) + "|" +
new DecimalFormat("00000").format(retries) + "|" + getValue();
}
public void setPointer(int pointer) {
this.pointer = pointer;
}
public long getPointer() {
return pointer;
}
public int getRetries() {
return retries;
}
public Date getReceivedDate() {
return receivedDate;
}
public void setStatus(Status status) {
this.status = status;
}
public Status getStatus() {
return status;
}
public void setRetries(int retries) {
this.retries = retries;
}
public void incRetries() {
retries++;
}
public String getReceivedDateStr() {
return DateUtil.convertDatetoString(receivedDate,DateUtil.minuteFormatFast);
}
public void setReceivedDate(String date) throws Exception {
receivedDate = DateUtil.convertStringToDate(date, DateUtil.minuteFormat);
}
public String getLastRetryDateStr() {
return DateUtil.convertDatetoString(lastRetryDate,DateUtil.minuteFormatFast);
}
public void setLastRetryDate(String date) throws Exception {
lastRetryDate = DateUtil.convertStringToDate(date, DateUtil.minuteFormat);
}
public void setLastRetryDate(Date date) {
lastRetryDate = date;
}
public Date getLastRetryDate() {
return lastRetryDate;
}
}
/* FileQueueTest.java */
import java.io.File;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.ReentrantLock;
import com.stimulus.util.FileQueue;
import com.stimulus.util.FileQueueException;
import com.stimulus.util.FileQueueItem;
public class FileQueueTest {
static ReentrantLock countLock = new ReentrantLock();
static int pCount = 0;
/**
* @param args
*/
public static void main(String[] args) {
System.out.println("queueing...");
try {
FileQueueItem exitReq = null;
TestQueue testQueue = new TestQueue("test");
testQueue.startQueueProcessor(10, TimeUnit.MILLISECONDS);
FileQueueItem[] queue = new FileQueueItem[100000];
for (int j = 0 ; j < 1000; j ++) {
queue[j] = new TestItem(j+1);
}
for (int j =0 ; j < 1000; j ++) {
// try { Thread.sleep(10); } catch (Exception e) {}
testQueue.queueItem(queue[j]);
//int x = 10 + (int)(Math.random()*2000);
//Thread.sleep(x);
}
Thread.sleep(10000);
System.out.println("activeCount:"+testQueue.getActCount());
System.out.println("procCount:"+testQueue.getProcCount());
System.out.println("delCount:"+testQueue.getDelCount());
System.out.println("process count:"+pCount);
testQueue.stopQueueProcessor();
} catch (Throwable t) {
t.printStackTrace();
}
}
public static class TestItem extends FileQueueItem {
String value;
public TestItem() {
}
public TestItem(Integer c) {
value = "item:"+c;
}
public String getValue() {
return value;
}
public void setValue(String value) throws FileQueueException {
this.value = value;
}
}
public static class TestQueue extends FileQueue {
public TestQueue(String name) {
super(name);
setRetryIntervalMinutes(1);
}
@Override
public File getQueueFile() {
return new File("/Users/jamie/system.queue");
}
@Override
public ProcessResult processFileQueueItem(FileQueueItem item) {
try {
countLock.lock();
pCount++;
System.out.println("process:"+pCount);
if (pCount==901) {
System.out.println("GOAL!");
}
System.out.println("activeCount:"+getActCount());
System.out.println("procCount:"+getProcCount());
System.out.println("delCount:"+getDelCount());
} finally {
countLock.unlock();
}
return ProcessResult.PROCESS_SUCCESS;
}
@Override
public void expiredItem(FileQueueItem item) {
}
@Override
public FileQueueItem newFileQueueItem() {
return new TestItem();
}
}
}
Read more: http://feeds.dzone.com/~r/dzone/snippets/~3/xVH4cT9Y6EM/13875