Thread: about monitoring the input stream

about monitoring the input stream

From
Albert Cardona
Date:
Following up from
http://archives.postgresql.org/pgsql-jdbc/2006-07/msg00052.php
I wonder if anything has been done on the subject of monitoring the amount of
bytes since the last query?

Finally I've digged into the code and put my hands through reflection on the
InputStream encapsulated in the PGStream. Problem is, the stream is emptied
frequently and I can't find the class responsible for reading it out, so my
readings of the available() bytes (in a monitoring thread) underestimate
tremendously the amount of bytes actually piped.

I understand there is only one InputStream per connection, encapsulated inside
a PGStream, created in the ProtocolConnectionImpl. Being able to monitor its
the amount of bytes fetched for each query would be really nice!

Any help appreciated.

Albert


--
Albert Cardona
Molecular Cell Developmental Biology
University of California Los Angeles
Tel +1 310 2067376
Programming: http://www.ini.unizh.ch/~acardona/trakem2.html
Research: http://www.mcdb.ucla.edu/Research/Hartenstein/
Web design: http://www.pixelets.com


Re: about monitoring the input stream

From
Marc Herbert
Date:
Albert Cardona <acardona@ini.phys.ethz.ch> writes:

> Following up from
> http://archives.postgresql.org/pgsql-jdbc/2006-07/msg00052.php
> I wonder if anything has been done on the subject of monitoring the amount of
> bytes since the last query?
>
> Finally I've digged into the code and put my hands through reflection on the
> InputStream encapsulated in the PGStream. Problem is, the stream is emptied
> frequently and I can't find the class responsible for reading it out, so my
> readings of the available() bytes (in a monitoring thread) underestimate
> tremendously the amount of bytes actually piped.

Why don't you decorate the InputStream with some simple
CountingInputStream of yours, just like Oliver suggested?

Something very similar to:
 private java.io.DataOutputStream#incCount()

If you are lazy you could even extend BufferInputStream; like this
you have just two read methods to override.


Re: about monitoring the input stream

From
Albert Cardona
Date:
Marc,

> Why don't you decorate the InputStream with some simple
> CountingInputStream of yours, just like Oliver suggested?

> Something very similar to:
> private java.io.DataOutputStream#incCount()

> If you are lazy you could even extend BufferInputStream; like this
> you have just two read methods to override.


The idea is not to modify the driver one bit in benefit of uncontroversial
deployment of my application (TrakEM2 at
http://www.ini.unizh.ch/~acardona/trackem2.html ). If java was lisp I would
simply alter the register to replace the InputStream, but it isn't and/or I
don't know how to use reflection to that extent (I don't know what would
happen was I to replace the InputStream using reflection once a connection
has been created).

Therefore I ask for suggestions on how to monitor the downloading rate ...
If there aren't any, well, tough luck! I'll have to live with it.

Albert




Re: about monitoring the input stream

From
Mark Lewis
Date:
If you're trying to add diagnostic data collection to all sockets you
could provide a custom SocketFactory implementation which returned
wrappers around Sockets that return CountingInputStream objects instead
of regular InputStream objects to the getInputStream() method.

I don't think that the PG JDBC driver allows you to specify a custom
socketFactory, but you could always set the default system socket
factory.  If you do other network operations in the same JVM, your
factory could be smart enough to ignore sockets that don't connect to
your PG server/port.

-- Mark Lewis


On Wed, 2006-09-13 at 18:09 +0200, Albert Cardona wrote:
> Marc,
>
> > Why don't you decorate the InputStream with some simple
> > CountingInputStream of yours, just like Oliver suggested?
>
> > Something very similar to:
> > private java.io.DataOutputStream#incCount()
>
> > If you are lazy you could even extend BufferInputStream; like this
> > you have just two read methods to override.
>
>
> The idea is not to modify the driver one bit in benefit of uncontroversial
> deployment of my application (TrakEM2 at
> http://www.ini.unizh.ch/~acardona/trackem2.html ). If java was lisp I would
> simply alter the register to replace the InputStream, but it isn't and/or I
> don't know how to use reflection to that extent (I don't know what would
> happen was I to replace the InputStream using reflection once a connection
> has been created).
>
> Therefore I ask for suggestions on how to monitor the downloading rate ...
> If there aren't any, well, tough luck! I'll have to live with it.
>
> Albert
>
>
>
>
> ---------------------------(end of broadcast)---------------------------
> TIP 5: don't forget to increase your free space map settings

Re: about monitoring the input stream

From
Marc Herbert
Date:
Albert Cardona <acardona@ini.phys.ethz.ch> writes:

> The idea is not to modify the driver one bit in benefit of uncontroversial
> deployment of my application (TrakEM2 at
> http://www.ini.unizh.ch/~acardona/trackem2.html ). If java was lisp I would
> simply alter the register to replace the InputStream, but it isn't and/or I
> don't know how to use reflection to that extent

Thanks to its reflection API, java is actually lisp. Just a bit less
convenient. This should do the trick:

      pg_input_field = PGStream.class.getDeclaredField("pg_input");

      pg_input_field.setAccessible(true);

      InputStream orig_stream = (InputStream) pg_input_field.get(pg_stream_to_hack);

      InputStream counting_stream = new CountingInputStream(orig_stream);

      pg_input_field.set(pg_stream_to_hack, counting_stream);



> (I don't know what would happen was I to replace the InputStream
> using reflection once a connection has been created).

As far as I understand the driver you _have to_ replace it after the
connection has been created. Just avoid using it before replace.




Re: about monitoring the input stream

From
Albert Cardona
Date:
Thank you very much Mark! I have my setup up and running. It didn't occur to
me to "simply" wrap the BufferedInputStream of the PGStream into my own
custom counter stream.

Very easy and very clean. Thanks!

Albert

--
Albert Cardona
Molecular Cell Developmental Biology
University of California Los Angeles
Tel +1 310 2067376
Programming: http://www.ini.unizh.ch/~acardona/trakem2.html
Research: http://www.mcdb.ucla.edu/Research/Hartenstein/
Web design: http://www.pixelets.com


Re: about monitoring the input stream

From
Marc Herbert
Date:
Albert Cardona <acardona@ini.phys.ethz.ch> writes:

> Thank you very much Mark! I have my setup up and running.

Good news, that's great and rewarding for those who helped!

Albert: would you share your code? I think it is of general interest.

To the maintainers: could Albert's code made available in some
"contrib" subdirectory of the CVS repository?

Albert: if not, could you at least post it on the list so it is
archived and available somewhere?

Cheers,

Marc.




Re: about monitoring the input stream

From
Albert Cardona
Date:
Mark,

The relevant parts of the code for monitoring the PGStream are attached below.

This code is part of TrakEM2, an ImageJ/postgresql -based application (GPL
applies) for managing an arbitrarily large set of images, segmented profiles
and metadata in general, for the purpose of extracting 3D models and a
hierarchical structure of objects present in the sample represented by the
images. See all details here:
http://www.ini.unizh.ch/~acardona/trakem2.html

The code below belongs to the current svn snapshot, which won't be available
until the end of this month.


/** Extract from private inner class Monitor. GPL applies, see the
TrakEM2-src.zip file, class ini.trakem2.persistence.DBLoader, at
http://www.ini.unizh.ch/~acardona/trakem2.html */

        public Monitor(Connection con) {
                connection = con;
                LoggingInputStream lis = null;
                try {
                        AbstractJdbc2Connection a2 =
(AbstractJdbc2Connection)connection;
                        Class c2 =
connection.getClass().getSuperclass().getSuperclass();
                        java.lang.reflect.Field f_proto =
c2.getDeclaredField("protoConnection");
                        f_proto.setAccessible(true);
                        // protoConnection is a ProtocolConnection interface,
implemented in core.v3.ProtocolConnectionImpl !
                        //ProtocolConnectionImpl pci =
(ProtocolConnectionImpl)m_proto.get(c2); // class is private to the package,
can't cast!
                        Object pci = f_proto.get(a2);
                        // finally, get the PGStream
                        java.lang.reflect.Field f_pgstream =
pci.getClass().getDeclaredField("pgStream");
                        f_pgstream.setAccessible(true);
                                            
                        PGStream pgstream = (PGStream)f_pgstream.get(pci);
                        // now the InputStream
                        java.lang.reflect.Field f_i =
pgstream.getClass().getDeclaredField("pg_input");
                        f_i.setAccessible(true);
                        InputStream stream = (InputStream)f_i.get(pgstream);
                        lis = new LoggingInputStream(stream);
                        f_i.set(pgstream, lis); // TADA! Many thanks to the
PGSQL JDBC mailing list for this last tip on not just monitoring the PGStream
as I was doing, but on replacing the inputstream altogether with a logging
copy! ("CountingInputStream", they called it).

                } catch (Exception e) {
                        new IJError(e);
                }
                this.lis = lis;
                makeWindow();
        }

/** ===================== */

/** The class below exists as ini.trakem2.io.LoggingInputStream in TrakEM2.
The GPL applies, see the TrakEM2-src.zip downloadable at
http://www.ini.unizh.ch/~acardona/trakem2.html */

import java.io.BufferedInputStream;
import java.io.InputStream;
import java.io.IOException;


/** A class to monitor an input stream for speed and total byte download. */
public class LoggingInputStream extends BufferedInputStream {

        private long last;
        private long n = 0;
        private long accum_time = 0;
        private long accum_bytes = 0;

        public LoggingInputStream(InputStream in) {
                super(in);
                last = System.currentTimeMillis();
        }

        public int read() throws IOException {
                int m = super.read();
                n += m;
                return m;
        }

        public int read(byte[] b) throws IOException {
                int m = super.read(b);
                n += m;
                return m;
        }

        public int read(byte[] b, int off, int len) throws IOException {
                int m = super.read(b, off, len);
                n += m;
                return m;
        }

        /** Put the accumulated count to zero. */
        public void resetInfo() { // to work perfect, this would need a
synchronized clause, but no such perfection is needed, and there are
perfomance issues.
                accum_bytes = n = 0;
                last = System.currentTimeMillis();
                accum_time = 0;
        }

        /** Returns info as
        * [0] = current time in ms
        * [1] = elapsed time in ms since last call to getInfo(long[])
        * [2] = n_bytes_read since last call to getInfo(long[])
        * [3] = accumulated time in ms since last call to resetInfo()
        * [4] = accumulated bytes since last call to resetInfo()
        *
        * So current speed = info[2]/info[1] Kb/s
        */
        public void getInfo(long[] info) {
                long now = System.currentTimeMillis();
                accum_time += now - last;
                accum_bytes += n;
                info[0] = now;
                info[1] = now - last; // elapsed time
                info[2] = n;
                info[3] = accum_time; // total time since last call to
resetInfo()
                info[4] = accum_bytes; // total bytes since last call to
resetInfo()
                // reset cycle vars:
                n = 0;
                last = now;
        }
}


--
Albert Cardona
Molecular Cell Developmental Biology
University of California Los Angeles
Tel +1 310 2067376
Programming: http://www.ini.unizh.ch/~acardona/trakem2.html
Research: http://www.mcdb.ucla.edu/Research/Hartenstein/
Web design: http://www.pixelets.com



Re: about monitoring the input stream

From
Dave Cramer
Date:
Hi Albert,

Could you release your code with a freebsd like license. I'm afraid
that we can't accept any GPL'd code.

I'd greatly love to see your code included.

Dave

On 16-Sep-06, at 6:08 AM, Albert Cardona wrote:

>
> Mark,
>
> The relevant parts of the code for monitoring the PGStream are
> attached below.
>
> This code is part of TrakEM2, an ImageJ/postgresql -based
> application (GPL
> applies) for managing an arbitrarily large set of images, segmented
> profiles
> and metadata in general, for the purpose of extracting 3D models and a
> hierarchical structure of objects present in the sample represented
> by the
> images. See all details here:
> http://www.ini.unizh.ch/~acardona/trakem2.html
>
> The code below belongs to the current svn snapshot, which won't be
> available
> until the end of this month.
>
>
> /** Extract from private inner class Monitor. GPL applies, see the
> TrakEM2-src.zip file, class ini.trakem2.persistence.DBLoader, at
> http://www.ini.unizh.ch/~acardona/trakem2.html */
>
>         public Monitor(Connection con) {
>                 connection = con;
>                 LoggingInputStream lis = null;
>                 try {
>                         AbstractJdbc2Connection a2 =
> (AbstractJdbc2Connection)connection;
>                         Class c2 =
> connection.getClass().getSuperclass().getSuperclass();
>                         java.lang.reflect.Field f_proto =
> c2.getDeclaredField("protoConnection");
>                         f_proto.setAccessible(true);
>                         // protoConnection is a ProtocolConnection
> interface,
> implemented in core.v3.ProtocolConnectionImpl !
>                         //ProtocolConnectionImpl pci =
> (ProtocolConnectionImpl)m_proto.get(c2); // class is private to the
> package,
> can't cast!
>                         Object pci = f_proto.get(a2);
>                         // finally, get the PGStream
>                         java.lang.reflect.Field f_pgstream =
> pci.getClass().getDeclaredField("pgStream");
>                         f_pgstream.setAccessible(true);
>                         PGStream pgstream = (PGStream)f_pgstream.get
> (pci);
>                         // now the InputStream
>                         java.lang.reflect.Field f_i =
> pgstream.getClass().getDeclaredField("pg_input");
>                         f_i.setAccessible(true);
>                         InputStream stream = (InputStream)f_i.get
> (pgstream);
>                         lis = new LoggingInputStream(stream);
>                         f_i.set(pgstream, lis); // TADA! Many
> thanks to the
> PGSQL JDBC mailing list for this last tip on not just monitoring
> the PGStream
> as I was doing, but on replacing the inputstream altogether with a
> logging
> copy! ("CountingInputStream", they called it).
>
>                 } catch (Exception e) {
>                         new IJError(e);
>                 }
>                 this.lis = lis;
>                 makeWindow();
>         }
>
> /** ===================== */
>
> /** The class below exists as ini.trakem2.io.LoggingInputStream in
> TrakEM2.
> The GPL applies, see the TrakEM2-src.zip downloadable at
> http://www.ini.unizh.ch/~acardona/trakem2.html */
>
> import java.io.BufferedInputStream;
> import java.io.InputStream;
> import java.io.IOException;
>
>
> /** A class to monitor an input stream for speed and total byte
> download. */
> public class LoggingInputStream extends BufferedInputStream {
>
>         private long last;
>         private long n = 0;
>         private long accum_time = 0;
>         private long accum_bytes = 0;
>
>         public LoggingInputStream(InputStream in) {
>                 super(in);
>                 last = System.currentTimeMillis();
>         }
>
>         public int read() throws IOException {
>                 int m = super.read();
>                 n += m;
>                 return m;
>         }
>
>         public int read(byte[] b) throws IOException {
>                 int m = super.read(b);
>                 n += m;
>                 return m;
>         }
>
>         public int read(byte[] b, int off, int len) throws
> IOException {
>                 int m = super.read(b, off, len);
>                 n += m;
>                 return m;
>         }
>
>         /** Put the accumulated count to zero. */
>         public void resetInfo() { // to work perfect, this would
> need a
> synchronized clause, but no such perfection is needed, and there are
> perfomance issues.
>                 accum_bytes = n = 0;
>                 last = System.currentTimeMillis();
>                 accum_time = 0;
>         }
>
>         /** Returns info as
>         * [0] = current time in ms
>         * [1] = elapsed time in ms since last call to getInfo(long[])
>         * [2] = n_bytes_read since last call to getInfo(long[])
>         * [3] = accumulated time in ms since last call to resetInfo()
>         * [4] = accumulated bytes since last call to resetInfo()
>         *
>         * So current speed = info[2]/info[1] Kb/s
>         */
>         public void getInfo(long[] info) {
>                 long now = System.currentTimeMillis();
>                 accum_time += now - last;
>                 accum_bytes += n;
>                 info[0] = now;
>                 info[1] = now - last; // elapsed time
>                 info[2] = n;
>                 info[3] = accum_time; // total time since last call to
> resetInfo()
>                 info[4] = accum_bytes; // total bytes since last
> call to
> resetInfo()
>                 // reset cycle vars:
>                 n = 0;
>                 last = now;
>         }
> }
>
>
> --
> Albert Cardona
> Molecular Cell Developmental Biology
> University of California Los Angeles
> Tel +1 310 2067376
> Programming: http://www.ini.unizh.ch/~acardona/trakem2.html
> Research: http://www.mcdb.ucla.edu/Research/Hartenstein/
> Web design: http://www.pixelets.com
>
>
>
> ---------------------------(end of
> broadcast)---------------------------
> TIP 3: Have you checked our extensive FAQ?
>
>                http://www.postgresql.org/docs/faq
>


Re: about monitoring the input stream

From
Albert Cardona
Date:
Dave,

My employer (Institute of Neuroinformatics) has a policy of GPLing all
software produced in house, with which I can't do anything at all (and anyway
I agree).

In any case, you can replicate the functionality not even looking at my code,
it's just a reflection hack.

Albert