Thread: about monitoring the input stream
Following up from http://archives.postgresql.org/pgsql-jdbc/2006-07/msg00052.php I wonder if anything has been done on the subject of monitoring the amount of bytes since the last query? Finally I've digged into the code and put my hands through reflection on the InputStream encapsulated in the PGStream. Problem is, the stream is emptied frequently and I can't find the class responsible for reading it out, so my readings of the available() bytes (in a monitoring thread) underestimate tremendously the amount of bytes actually piped. I understand there is only one InputStream per connection, encapsulated inside a PGStream, created in the ProtocolConnectionImpl. Being able to monitor its the amount of bytes fetched for each query would be really nice! Any help appreciated. Albert -- Albert Cardona Molecular Cell Developmental Biology University of California Los Angeles Tel +1 310 2067376 Programming: http://www.ini.unizh.ch/~acardona/trakem2.html Research: http://www.mcdb.ucla.edu/Research/Hartenstein/ Web design: http://www.pixelets.com
Albert Cardona <acardona@ini.phys.ethz.ch> writes: > Following up from > http://archives.postgresql.org/pgsql-jdbc/2006-07/msg00052.php > I wonder if anything has been done on the subject of monitoring the amount of > bytes since the last query? > > Finally I've digged into the code and put my hands through reflection on the > InputStream encapsulated in the PGStream. Problem is, the stream is emptied > frequently and I can't find the class responsible for reading it out, so my > readings of the available() bytes (in a monitoring thread) underestimate > tremendously the amount of bytes actually piped. Why don't you decorate the InputStream with some simple CountingInputStream of yours, just like Oliver suggested? Something very similar to: private java.io.DataOutputStream#incCount() If you are lazy you could even extend BufferInputStream; like this you have just two read methods to override.
Marc, > Why don't you decorate the InputStream with some simple > CountingInputStream of yours, just like Oliver suggested? > Something very similar to: > private java.io.DataOutputStream#incCount() > If you are lazy you could even extend BufferInputStream; like this > you have just two read methods to override. The idea is not to modify the driver one bit in benefit of uncontroversial deployment of my application (TrakEM2 at http://www.ini.unizh.ch/~acardona/trackem2.html ). If java was lisp I would simply alter the register to replace the InputStream, but it isn't and/or I don't know how to use reflection to that extent (I don't know what would happen was I to replace the InputStream using reflection once a connection has been created). Therefore I ask for suggestions on how to monitor the downloading rate ... If there aren't any, well, tough luck! I'll have to live with it. Albert
If you're trying to add diagnostic data collection to all sockets you could provide a custom SocketFactory implementation which returned wrappers around Sockets that return CountingInputStream objects instead of regular InputStream objects to the getInputStream() method. I don't think that the PG JDBC driver allows you to specify a custom socketFactory, but you could always set the default system socket factory. If you do other network operations in the same JVM, your factory could be smart enough to ignore sockets that don't connect to your PG server/port. -- Mark Lewis On Wed, 2006-09-13 at 18:09 +0200, Albert Cardona wrote: > Marc, > > > Why don't you decorate the InputStream with some simple > > CountingInputStream of yours, just like Oliver suggested? > > > Something very similar to: > > private java.io.DataOutputStream#incCount() > > > If you are lazy you could even extend BufferInputStream; like this > > you have just two read methods to override. > > > The idea is not to modify the driver one bit in benefit of uncontroversial > deployment of my application (TrakEM2 at > http://www.ini.unizh.ch/~acardona/trackem2.html ). If java was lisp I would > simply alter the register to replace the InputStream, but it isn't and/or I > don't know how to use reflection to that extent (I don't know what would > happen was I to replace the InputStream using reflection once a connection > has been created). > > Therefore I ask for suggestions on how to monitor the downloading rate ... > If there aren't any, well, tough luck! I'll have to live with it. > > Albert > > > > > ---------------------------(end of broadcast)--------------------------- > TIP 5: don't forget to increase your free space map settings
Albert Cardona <acardona@ini.phys.ethz.ch> writes: > The idea is not to modify the driver one bit in benefit of uncontroversial > deployment of my application (TrakEM2 at > http://www.ini.unizh.ch/~acardona/trackem2.html ). If java was lisp I would > simply alter the register to replace the InputStream, but it isn't and/or I > don't know how to use reflection to that extent Thanks to its reflection API, java is actually lisp. Just a bit less convenient. This should do the trick: pg_input_field = PGStream.class.getDeclaredField("pg_input"); pg_input_field.setAccessible(true); InputStream orig_stream = (InputStream) pg_input_field.get(pg_stream_to_hack); InputStream counting_stream = new CountingInputStream(orig_stream); pg_input_field.set(pg_stream_to_hack, counting_stream); > (I don't know what would happen was I to replace the InputStream > using reflection once a connection has been created). As far as I understand the driver you _have to_ replace it after the connection has been created. Just avoid using it before replace.
Thank you very much Mark! I have my setup up and running. It didn't occur to me to "simply" wrap the BufferedInputStream of the PGStream into my own custom counter stream. Very easy and very clean. Thanks! Albert -- Albert Cardona Molecular Cell Developmental Biology University of California Los Angeles Tel +1 310 2067376 Programming: http://www.ini.unizh.ch/~acardona/trakem2.html Research: http://www.mcdb.ucla.edu/Research/Hartenstein/ Web design: http://www.pixelets.com
Albert Cardona <acardona@ini.phys.ethz.ch> writes: > Thank you very much Mark! I have my setup up and running. Good news, that's great and rewarding for those who helped! Albert: would you share your code? I think it is of general interest. To the maintainers: could Albert's code made available in some "contrib" subdirectory of the CVS repository? Albert: if not, could you at least post it on the list so it is archived and available somewhere? Cheers, Marc.
Mark, The relevant parts of the code for monitoring the PGStream are attached below. This code is part of TrakEM2, an ImageJ/postgresql -based application (GPL applies) for managing an arbitrarily large set of images, segmented profiles and metadata in general, for the purpose of extracting 3D models and a hierarchical structure of objects present in the sample represented by the images. See all details here: http://www.ini.unizh.ch/~acardona/trakem2.html The code below belongs to the current svn snapshot, which won't be available until the end of this month. /** Extract from private inner class Monitor. GPL applies, see the TrakEM2-src.zip file, class ini.trakem2.persistence.DBLoader, at http://www.ini.unizh.ch/~acardona/trakem2.html */ public Monitor(Connection con) { connection = con; LoggingInputStream lis = null; try { AbstractJdbc2Connection a2 = (AbstractJdbc2Connection)connection; Class c2 = connection.getClass().getSuperclass().getSuperclass(); java.lang.reflect.Field f_proto = c2.getDeclaredField("protoConnection"); f_proto.setAccessible(true); // protoConnection is a ProtocolConnection interface, implemented in core.v3.ProtocolConnectionImpl ! //ProtocolConnectionImpl pci = (ProtocolConnectionImpl)m_proto.get(c2); // class is private to the package, can't cast! Object pci = f_proto.get(a2); // finally, get the PGStream java.lang.reflect.Field f_pgstream = pci.getClass().getDeclaredField("pgStream"); f_pgstream.setAccessible(true); PGStream pgstream = (PGStream)f_pgstream.get(pci); // now the InputStream java.lang.reflect.Field f_i = pgstream.getClass().getDeclaredField("pg_input"); f_i.setAccessible(true); InputStream stream = (InputStream)f_i.get(pgstream); lis = new LoggingInputStream(stream); f_i.set(pgstream, lis); // TADA! Many thanks to the PGSQL JDBC mailing list for this last tip on not just monitoring the PGStream as I was doing, but on replacing the inputstream altogether with a logging copy! ("CountingInputStream", they called it). } catch (Exception e) { new IJError(e); } this.lis = lis; makeWindow(); } /** ===================== */ /** The class below exists as ini.trakem2.io.LoggingInputStream in TrakEM2. The GPL applies, see the TrakEM2-src.zip downloadable at http://www.ini.unizh.ch/~acardona/trakem2.html */ import java.io.BufferedInputStream; import java.io.InputStream; import java.io.IOException; /** A class to monitor an input stream for speed and total byte download. */ public class LoggingInputStream extends BufferedInputStream { private long last; private long n = 0; private long accum_time = 0; private long accum_bytes = 0; public LoggingInputStream(InputStream in) { super(in); last = System.currentTimeMillis(); } public int read() throws IOException { int m = super.read(); n += m; return m; } public int read(byte[] b) throws IOException { int m = super.read(b); n += m; return m; } public int read(byte[] b, int off, int len) throws IOException { int m = super.read(b, off, len); n += m; return m; } /** Put the accumulated count to zero. */ public void resetInfo() { // to work perfect, this would need a synchronized clause, but no such perfection is needed, and there are perfomance issues. accum_bytes = n = 0; last = System.currentTimeMillis(); accum_time = 0; } /** Returns info as * [0] = current time in ms * [1] = elapsed time in ms since last call to getInfo(long[]) * [2] = n_bytes_read since last call to getInfo(long[]) * [3] = accumulated time in ms since last call to resetInfo() * [4] = accumulated bytes since last call to resetInfo() * * So current speed = info[2]/info[1] Kb/s */ public void getInfo(long[] info) { long now = System.currentTimeMillis(); accum_time += now - last; accum_bytes += n; info[0] = now; info[1] = now - last; // elapsed time info[2] = n; info[3] = accum_time; // total time since last call to resetInfo() info[4] = accum_bytes; // total bytes since last call to resetInfo() // reset cycle vars: n = 0; last = now; } } -- Albert Cardona Molecular Cell Developmental Biology University of California Los Angeles Tel +1 310 2067376 Programming: http://www.ini.unizh.ch/~acardona/trakem2.html Research: http://www.mcdb.ucla.edu/Research/Hartenstein/ Web design: http://www.pixelets.com
Hi Albert, Could you release your code with a freebsd like license. I'm afraid that we can't accept any GPL'd code. I'd greatly love to see your code included. Dave On 16-Sep-06, at 6:08 AM, Albert Cardona wrote: > > Mark, > > The relevant parts of the code for monitoring the PGStream are > attached below. > > This code is part of TrakEM2, an ImageJ/postgresql -based > application (GPL > applies) for managing an arbitrarily large set of images, segmented > profiles > and metadata in general, for the purpose of extracting 3D models and a > hierarchical structure of objects present in the sample represented > by the > images. See all details here: > http://www.ini.unizh.ch/~acardona/trakem2.html > > The code below belongs to the current svn snapshot, which won't be > available > until the end of this month. > > > /** Extract from private inner class Monitor. GPL applies, see the > TrakEM2-src.zip file, class ini.trakem2.persistence.DBLoader, at > http://www.ini.unizh.ch/~acardona/trakem2.html */ > > public Monitor(Connection con) { > connection = con; > LoggingInputStream lis = null; > try { > AbstractJdbc2Connection a2 = > (AbstractJdbc2Connection)connection; > Class c2 = > connection.getClass().getSuperclass().getSuperclass(); > java.lang.reflect.Field f_proto = > c2.getDeclaredField("protoConnection"); > f_proto.setAccessible(true); > // protoConnection is a ProtocolConnection > interface, > implemented in core.v3.ProtocolConnectionImpl ! > //ProtocolConnectionImpl pci = > (ProtocolConnectionImpl)m_proto.get(c2); // class is private to the > package, > can't cast! > Object pci = f_proto.get(a2); > // finally, get the PGStream > java.lang.reflect.Field f_pgstream = > pci.getClass().getDeclaredField("pgStream"); > f_pgstream.setAccessible(true); > PGStream pgstream = (PGStream)f_pgstream.get > (pci); > // now the InputStream > java.lang.reflect.Field f_i = > pgstream.getClass().getDeclaredField("pg_input"); > f_i.setAccessible(true); > InputStream stream = (InputStream)f_i.get > (pgstream); > lis = new LoggingInputStream(stream); > f_i.set(pgstream, lis); // TADA! Many > thanks to the > PGSQL JDBC mailing list for this last tip on not just monitoring > the PGStream > as I was doing, but on replacing the inputstream altogether with a > logging > copy! ("CountingInputStream", they called it). > > } catch (Exception e) { > new IJError(e); > } > this.lis = lis; > makeWindow(); > } > > /** ===================== */ > > /** The class below exists as ini.trakem2.io.LoggingInputStream in > TrakEM2. > The GPL applies, see the TrakEM2-src.zip downloadable at > http://www.ini.unizh.ch/~acardona/trakem2.html */ > > import java.io.BufferedInputStream; > import java.io.InputStream; > import java.io.IOException; > > > /** A class to monitor an input stream for speed and total byte > download. */ > public class LoggingInputStream extends BufferedInputStream { > > private long last; > private long n = 0; > private long accum_time = 0; > private long accum_bytes = 0; > > public LoggingInputStream(InputStream in) { > super(in); > last = System.currentTimeMillis(); > } > > public int read() throws IOException { > int m = super.read(); > n += m; > return m; > } > > public int read(byte[] b) throws IOException { > int m = super.read(b); > n += m; > return m; > } > > public int read(byte[] b, int off, int len) throws > IOException { > int m = super.read(b, off, len); > n += m; > return m; > } > > /** Put the accumulated count to zero. */ > public void resetInfo() { // to work perfect, this would > need a > synchronized clause, but no such perfection is needed, and there are > perfomance issues. > accum_bytes = n = 0; > last = System.currentTimeMillis(); > accum_time = 0; > } > > /** Returns info as > * [0] = current time in ms > * [1] = elapsed time in ms since last call to getInfo(long[]) > * [2] = n_bytes_read since last call to getInfo(long[]) > * [3] = accumulated time in ms since last call to resetInfo() > * [4] = accumulated bytes since last call to resetInfo() > * > * So current speed = info[2]/info[1] Kb/s > */ > public void getInfo(long[] info) { > long now = System.currentTimeMillis(); > accum_time += now - last; > accum_bytes += n; > info[0] = now; > info[1] = now - last; // elapsed time > info[2] = n; > info[3] = accum_time; // total time since last call to > resetInfo() > info[4] = accum_bytes; // total bytes since last > call to > resetInfo() > // reset cycle vars: > n = 0; > last = now; > } > } > > > -- > Albert Cardona > Molecular Cell Developmental Biology > University of California Los Angeles > Tel +1 310 2067376 > Programming: http://www.ini.unizh.ch/~acardona/trakem2.html > Research: http://www.mcdb.ucla.edu/Research/Hartenstein/ > Web design: http://www.pixelets.com > > > > ---------------------------(end of > broadcast)--------------------------- > TIP 3: Have you checked our extensive FAQ? > > http://www.postgresql.org/docs/faq >
Dave, My employer (Institute of Neuroinformatics) has a policy of GPLing all software produced in house, with which I can't do anything at all (and anyway I agree). In any case, you can replicate the functionality not even looking at my code, it's just a reflection hack. Albert