Re: Small PosgreSQL locking function request - with bounty - Mailing list pgsql-general

From rob stone
Subject Re: Small PosgreSQL locking function request - with bounty
Date
Msg-id 1378999007.4937.17.camel@roblaptop.virtua.com.br
Whole thread Raw
In response to Small PosgreSQL locking function request - with bounty  (David Noel <david.i.noel@gmail.com>)
List pgsql-general

On Thu, 2013-09-12 at 06:40 -0500, David Noel wrote:
> I have a few database queries that I've been running from within a
> Java project. I have recently come to the understanding that I need to
> run them instead within the PostgreSQL server as stored functions. I
> have that understanding because I need to make use of locking
> functionality, and that seems only able to be done by means of
> PostgreSQL functions. Transactions don't seem to be able to provide
> this. I've never written functions for postgres, so I thought maybe
> someone here could help.
>
> To provide some context: the code is a part of a webcrawler. More
> specifically, it is a part of the queuing system that handles the
> management of URL's to be crawled. The system takes URL's from the
> queue with one query, and marks them as active with a second. It then
> sends the results on to the crawler. Once the crawler has crawled the
> URL, a third query removes the URL from the queue.
>
> The code is running concurrently in multiple threads on multiple
> servers, and in scaling it to multiple servers I've run into some
> problems. It seems that due to the way postgres is designed I am
> unable to lock tables, or utilize transactions in Java to acheive
> concurrency. So I need it instead to be run as a postgres
> function/stored procedure. It seems. Am I correct in this, or did I
> misread the PosgreSQL Transactions documentation?
>
> Assuming the only way to accomplish this is with a postgres function,
> would anyone care to implement this for a small reward (though
> relative to the amount of work required I'd say it's probably a decent
> to large reward)?
>
> The queries are as follows:
>
> String querySelect =
> "select \"URL\",\"PublishDate\",\"SiteName\",\"Classification\",\"Special\",\"NextCrawlDate\"
> from \"crawlq\" " +
> "where \"Special\" = ? " +
> "AND \"Active\" = 'true' " +
> "AND \"TimeoutDate\" <= now() " +
> "AND \"CrawlError\" = 'false' " +
> "OR " +
> "\"Special\" = ? " +
> "AND \"Active\" = 'false' " +
> "AND \"CrawlError\" = 'false' " +
> "order by \"NextCrawlDate\" asc limit 1";
>
> String queryUpdateActive =
> "update \"crawlq\" " +
> "set \"Active\" = 'true', " +
> "\"TimeoutDate\" = now() + interval '5 minutes' " +
> "where \"URL\" = ? " ;
>
> This is what I need the function to do:
>
> I need the PostgreSQL function to first lock the table "crawlq".
> I then need it to perform the "querySelect" query.
> I then need it to perform the "queryUpdateActive" query.
> I then need it to unlock the table.
> I then need it to return the values from the select query to the Java project.
>
> Deliverables: I need the postgres function and a simple java program
> that calls the function and returns a result set. It doesn't need to
> do anything with the data, just call the function and return the value
> to the program. This should be a very simple project that shouldn't
> take more than 15 minutes for anyone familiar with writing postgres
> functions. Would $50 via PayPal be enough to entice anyone to offer a
> solution? I could also offer the payment in LiteCoins if you'd rather
> do it that way. Of course if you're feeling benevolent I wouldn't
> object to anyone who felt like doing it for free.
>
> The Java function I'm using that fetches elements from the queue
> currently is as follows:
>
>     public synchronized FetchType fetch(String cs){
>         if(debug_level == 1)
>             System.out.println(new java.util.Date(System.currentTimeMillis()) +
> " : DAO : fetching element from database");
>
>         /**
>          * prepare the select query
>          * execute it -- pull the items from the queue database
>          * load the query results into a return container
>          * clean up
>          * prepare the update query
>          * execute it -- update the record as active
>          * clean up
>          * commit the transaction
>          * return the query results
>          */
>
>         try {
>             if(!dbq.isValid(10))
>                 connectQ();
>         } catch (SQLException e1) {
>             e1.printStackTrace();
>         }
>
>         PreparedStatement stmt = null;
>         PreparedStatement stmt2 = null;
>         ResultSet rset = null;
>         FetchType ret = null;
>
>         // TODO: use a stored function
>         String     querySelect    =
>                 "select \"URL\",\"PublishDate\",\"SiteName\",\"Classification\",\"Special\",\"NextCrawlDate\"
> from \"crawlq\" "    +
>                 "where \"Special\" = ? "        +
>                 "AND \"Active\" = 'true' "            +
>                 "AND \"TimeoutDate\" <= now() "     +
>                 "AND \"CrawlError\" = 'false' "        +
>                 "OR "                                 +
>                 "\"Special\" = ? "             +
>                 "AND \"Active\" = 'false' "            +
>                 "AND \"CrawlError\" = 'false' "        +
>                 "order by \"NextCrawlDate\" asc limit 1";
>
>         String queryUpdateActive =
>                 "update \"crawlq\" "                 +
>                     "set \"Active\" = 'true', "     +
>                     "\"TimeoutDate\" = now() + interval '5 minutes' "     +
>                 "where \"URL\" = ? "                ;
>
>         try {
>             stmt = dbq.prepareStatement(querySelect);
>             stmt.setEscapeProcessing(true);
>             stmt.setString(1, cs);
>             stmt.setString(2, cs);
>             rset = stmt.executeQuery();
>
>             if(rset.next()){
>                 ret = new FetchType(
>                         rset.getString("URL"),
>                         rset.getString("SiteName"),
>                         rset.getString("Classification"),
>                         rset.getDate("PublishDate"),
>                         rset.getString("Special")
>                         );
>             } else
>                 ret = null;
>
>             rset.close();
>             stmt.close();
>
>             if (ret != null){
>                 stmt2 = dbq.prepareStatement(queryUpdateActive);
>                 stmt2.setEscapeProcessing(true);
>                 stmt2.setString(1, ret.getURL());
>                 stmt2.execute();
>                 stmt2.close();
>                 dbq.commit();
>             }
>
>             if(debug_level == 1)
>                 System.out.println(new java.util.Date(System.currentTimeMillis())
> + " : DAO : fetch complete " + ret.getURL());
>
>             return ret;
>         } catch (SQLException e) {
>             try {
>                 e.printStackTrace();
>                 dbq.rollback();
>                 stmt.close();
>                 stmt2.close();
>                 e.printStackTrace();
>                 return null;
>             } catch (SQLException e2) {
>                 e2.printStackTrace();
>                 return null;
>             }
>         }
>     }
>
> Running on one machine I'm bypassing the transaction concurrency issue
> by synchronizing the method. But the Java concurrency constructs I'm
> using here don't scale to multiple machines.
>
> At any rate, have I provided enough information to get the solution
> I'm looking for? Have I provided enough financial incentive to get
> this implemented? If so, please respond with code here to the list so
> multiple people don't implement it and expect to be paid. I can only
> pay one person, though if another person fixes a bug in a proposed
> solution I'm open to splitting the bounty however seems fair.
>
> Thanks for reading, hope to hear back!
>
> -David Noel
>
>

Hello David,

Your table crawlq must have an unique key along the lines of:-

crawlq_id SERIAL NOT NULL PRIMARY KEY USING INDEX TABLESPACE whatever,

Take the "limit 1" off from your select so that you have a result set
containing 'n' rows. Just select the unique primary key.
Read a key from the result set. Then:-

BEGIN;
SELECT columns FROM crawlq WHERE crawlq_id = key FOR UPDATE;
(Row is now locked).
In a try -- catch block do your UPDATE.
No errors -- COMMIT;
Errors -- ROLLBACK; plus spit out reason to application even though it
will be in the log but it's better to find these things out a.s.a.p.

When the result set is empty, return and populate it again with new
id's.

I have no idea how many rows are in table crawlq. One idea would be to
create a table crawlq_processed and instead of updating crawlq, delete
the row from that table and insert the data into crawlq_processed. Auto
vacuum crawlq every few hours or so.

Anyway, the basic flow is:-
BEGIN;
SELECT FOR UPDATE;
do all your processing;
COMMIT; or ROLLBACK;

Hope this helps.
Cheers,
Robert




pgsql-general by date:

Previous
From: AI Rumman
Date:
Subject: 9.2 Replication in Ubuntu ; need help
Next
From: rob stone
Date:
Subject: Re: Need help with Inet type