package org.postgresql.core;

import java.io.*;

/**
 * Temporary hack to try to detect/avoid socket deadlocks caused
 * by blocking on write while we have lots of pending data to read
 * from the server (i.e. the server is also blocked on write).
 *
 * see the thread at http://archives.postgresql.org/pgsql-jdbc/2009-01/msg00045.php
 *
 * @author Oliver Jowett <oliver@opencloud.com>
 */
class AntiDeadlockStream extends OutputStream implements Runnable {
    private static final class BufferLock {}
    private final BufferLock bufferLock = new BufferLock();

    private final OutputStream wrapped;
    private final long flushTimeout;    

    private byte[] buffer;
    private int bufferSize;
    private byte[] swapBuffer;    

    private boolean closeRequest;
    private boolean flushRequest;
    private boolean closeComplete;

    private IOException failedException;

    AntiDeadlockStream(OutputStream wrapped, int initialSize, long flushTimeout) {
        this.wrapped = wrapped;
        this.flushTimeout = flushTimeout;
        this.buffer = new byte[initialSize];
        this.swapBuffer = new byte[initialSize];
        
        new Thread(this, "AntiDeadlock thread").start();
    }

    public void close() throws IOException {
        synchronized (bufferLock) {
            closeRequest = true;
            bufferLock.notifyAll();

            while (!closeComplete) {
                if (failedException != null)
                    throw (IOException) (new IOException("Write thread reported an error").initCause(failedException));
                
                try {
                    bufferLock.wait();
                } catch (InterruptedException ie) {
                    throw new InterruptedIOException();
                }
            }
        }
    }

    public void flush() throws IOException {
        synchronized (bufferLock) {
            long expiry = -1;

            flushRequest = true;                
            bufferLock.notifyAll();

            while (true) {
                if (failedException != null)
                    throw (IOException) (new IOException("Write thread reported an error").initCause(failedException));
                if (closeRequest)
                    throw new IOException("Stream is closed");
                if (bufferSize == 0)
                    return;

                long delay;
                if (expiry == -1) {
                    delay = flushTimeout;
                    expiry = System.currentTimeMillis() + delay;
                } else {
                    delay = expiry - System.currentTimeMillis();
                }

                if (delay <= 0) {
                    System.err.println("Warning: possible socket deadlock detected (timeout=" + flushTimeout + ", remaining buffer=" + bufferSize);
                    new Throwable("Deadlock call stack").fillInStackTrace().printStackTrace(System.err);
                    return;
                }

                try {
                    bufferLock.wait(delay);
                } catch (InterruptedException ie) {
                    throw new InterruptedIOException();
                }
            }
        }
    }

    public void write(int b) throws IOException {
        write(new byte[] { (byte)b }, 0, 1);
    }

    public void write(byte[] b) throws IOException {
        write(b, 0, b.length);
    }

    public void write(byte[] b, int off, int len) throws IOException {
        if (b == null)
            throw new NullPointerException();

        if (off < 0 || len < 0 || off+len > b.length)
            throw new IndexOutOfBoundsException();

        synchronized (bufferLock) {
            if (closeRequest)
                throw new IOException("Stream is closed");

            if (failedException != null)
                throw (IOException) (new IOException("Write thread reported an error").initCause(failedException));

            int needs = bufferSize + len;
            int newSize = buffer.length;
            while (newSize < needs)
                newSize *= 2;
                
            if (newSize != buffer.length) {
                byte[] newBuffer = new byte[newSize];
                System.arraycopy(buffer, 0, newBuffer, 0, bufferSize);
                buffer = newBuffer;
            }

            if (bufferSize == 0)
                bufferLock.notifyAll();

            System.arraycopy(b, off, buffer, bufferSize, len);
            bufferSize += len;
        }
    }

    //
    // Runnable
    //
   
    public void run() {
        while (true) {
            boolean doFlush;
            boolean doClose;
            int writeLength;

            synchronized (bufferLock) {
                if (bufferSize == 0 && !closeRequest && !flushRequest) {
                    try {
                        bufferLock.wait();
                    } catch (InterruptedException ie) {
                        failedException = new InterruptedIOException("write thread interrupted");
                        bufferLock.notifyAll();
                        return;
                    }

                    continue;
                }

                byte[] oldBuffer = buffer;
                buffer = swapBuffer;
                swapBuffer = buffer;

                writeLength = bufferSize;
                doFlush = flushRequest;
                doClose = closeRequest;
                
                flushRequest = false;
                
                bufferLock.notifyAll();
            }

            try {
                if (writeLength > 0)
                    wrapped.write(swapBuffer, 0, writeLength);
                if (flushRequest)
                    wrapped.flush();
                if (closeRequest) {
                    wrapped.close();

                    synchronized (bufferLock) {
                        closeComplete = true;
                        bufferLock.notifyAll();
                    }
                    
                    return;
                }
            } catch (IOException ioe) {
                synchronized (bufferLock) {
                    failedException = ioe;
                    bufferLock.notifyAll();
                    try {
                        wrapped.close();
                    } catch (IOException ioe2) {
                        // Ignore it.
                    }
                    return;
                }
            }
        }
    }
}
