内容简介:聊聊pg jdbc的queryTimeout及next方法
@Test public void testReadTimeout() throws SQLException { Connection connection = dataSource.getConnection(); //https://jdbc.postgresql.org/documentation/head/query.html connection.setAutoCommit(false); //NOTE 为了设置fetchSize,必须设置为false String sql = "select * from demo_table"; PreparedStatement pstmt; try { pstmt = (PreparedStatement)connection.prepareStatement(sql); pstmt.setQueryTimeout(1); //NOTE 设置Statement执行完成的超时时间,前提是socket的timeout比这个大 pstmt.setFetchSize(5000); //NOTE 这样设置为了模拟query timeout的异常 System.out.println("ps.getQueryTimeout():" + pstmt.getQueryTimeout()); System.out.println("ps.getFetchSize():" + pstmt.getFetchSize()); System.out.println("ps.getFetchDirection():" + pstmt.getFetchDirection()); System.out.println("ps.getMaxFieldSize():" + pstmt.getMaxFieldSize()); ResultSet rs = pstmt.executeQuery(); //NOTE 设置Statement执行完成的超时时间,前提是socket的timeout比这个大 //NOTE 这里返回了就代表statement执行完成,默认返回fetchSize的数据 int col = rs.getMetaData().getColumnCount(); System.out.println("============================"); while (rs.next()) { //NOTE 这个的timeout由socket的超时时间设置,oracle.jdbc.ReadTimeout=60000 for (int i = 1; i <= col; i++) { System.out.print(rs.getObject(i)); } System.out.println(""); } System.out.println("============================"); } catch (SQLException e) { e.printStackTrace(); } finally { //close resources } }
PgStatement
ostgresql-9.4.1212.jre7-sources.jar!/org/postgresql/jdbc/PgStatement.java
executeInternal()
private void executeInternal(CachedQuery cachedQuery, ParameterList queryParameters, int flags) throws SQLException { closeForNextExecution(); // Enable cursor-based resultset if possible. if (fetchSize > 0 && !wantsScrollableResultSet() && !connection.getAutoCommit() && !wantsHoldableResultSet()) { flags |= QueryExecutor.QUERY_FORWARD_CURSOR; } if (wantsGeneratedKeysOnce || wantsGeneratedKeysAlways) { flags |= QueryExecutor.QUERY_BOTH_ROWS_AND_STATUS; // If the no results flag is set (from executeUpdate) // clear it so we get the generated keys results. // if ((flags & QueryExecutor.QUERY_NO_RESULTS) != 0) { flags &= ~(QueryExecutor.QUERY_NO_RESULTS); } } if (isOneShotQuery(cachedQuery)) { flags |= QueryExecutor.QUERY_ONESHOT; } // Only use named statements after we hit the threshold. Note that only // named statements can be transferred in binary format. if (connection.getAutoCommit()) { flags |= QueryExecutor.QUERY_SUPPRESS_BEGIN; } // updateable result sets do not yet support binary updates if (concurrency != ResultSet.CONCUR_READ_ONLY) { flags |= QueryExecutor.QUERY_NO_BINARY_TRANSFER; } Query queryToExecute = cachedQuery.query; if (queryToExecute.isEmpty()) { flags |= QueryExecutor.QUERY_SUPPRESS_BEGIN; } if (!queryToExecute.isStatementDescribed() && forceBinaryTransfers && (flags & QueryExecutor.QUERY_EXECUTE_AS_SIMPLE) == 0) { // Simple 'Q' execution does not need to know parameter types // When binaryTransfer is forced, then we need to know resulting parameter and column types, // thus sending a describe request. int flags2 = flags | QueryExecutor.QUERY_DESCRIBE_ONLY; StatementResultHandler handler2 = new StatementResultHandler(); connection.getQueryExecutor().execute(queryToExecute, queryParameters, handler2, 0, 0, flags2); ResultWrapper result2 = handler2.getResults(); if (result2 != null) { result2.getResultSet().close(); } } StatementResultHandler handler = new StatementResultHandler(); result = null; try { startTimer(); connection.getQueryExecutor().execute(queryToExecute, queryParameters, handler, maxrows, fetchSize, flags); } finally { killTimerTask(); } result = firstUnclosedResult = handler.getResults(); if (wantsGeneratedKeysOnce || wantsGeneratedKeysAlways) { generatedKeys = result; result = result.getNext(); if (wantsGeneratedKeysOnce) { wantsGeneratedKeysOnce = false; } } }
注意,这里在执行前后分别调用了startTimer()和killTimerTask()
startTimer()
private void startTimer() { /* * there shouldn't be any previous timer active, but better safe than sorry. */ cleanupTimer(); STATE_UPDATER.set(this, StatementCancelState.IN_QUERY); if (timeout == 0) { return; } TimerTask cancelTask = new TimerTask() { public void run() { try { if (!CANCEL_TIMER_UPDATER.compareAndSet(PgStatement.this, this, null)) { // Nothing to do here, statement has already finished and cleared // cancelTimerTask reference return; } PgStatement.this.cancel(); } catch (SQLException e) { } } }; CANCEL_TIMER_UPDATER.set(this, cancelTask); connection.addTimerTask(cancelTask, timeout); }
- startTimer调用了cleanupTimer()
- cancelTask调用的是PgStatement.this.cancel()
- 最后调用connection.addTimerTask添加定时任务
cleanupTimer()
/** * Clears {@link #cancelTimerTask} if any. Returns true if and only if "cancel" timer task would * never invoke {@link #cancel()}. */ private boolean cleanupTimer() { TimerTask timerTask = CANCEL_TIMER_UPDATER.get(this); if (timerTask == null) { // If timeout is zero, then timer task did not exist, so we safely report "all clear" return timeout == 0; } if (!CANCEL_TIMER_UPDATER.compareAndSet(this, timerTask, null)) { // Failed to update reference -> timer has just fired, so we must wait for the query state to // become "cancelling". return false; } timerTask.cancel(); connection.purgeTimerTasks(); // All clear return true; }
注意这里更新statement状态之后,调用task的cancel,以及connection.purgeTimerTasks()
cancel()
public void cancel() throws SQLException { if (!STATE_UPDATER.compareAndSet(this, StatementCancelState.IN_QUERY, StatementCancelState.CANCELING)) { // Not in query, there's nothing to cancel return; } try { // Synchronize on connection to avoid spinning in killTimerTask synchronized (connection) { connection.cancelQuery(); } } finally { STATE_UPDATER.set(this, StatementCancelState.CANCELLED); synchronized (connection) { connection.notifyAll(); // wake-up killTimerTask } } }
executeQuery超时了则直接调用connection.cancelQuery()
public void cancelQuery() throws SQLException { checkClosed(); queryExecutor.sendQueryCancel(); }
postgresql-9.4.1212.jre7-sources.jar!/org/postgresql/core/QueryExecutorBase.java
public void sendQueryCancel() throws SQLException { if (cancelPid <= 0) { return; } PGStream cancelStream = null; // Now we need to construct and send a cancel packet try { if (logger.logDebug()) { logger.debug(" FE=> CancelRequest(pid=" + cancelPid + ",ckey=" + cancelKey + ")"); } cancelStream = new PGStream(pgStream.getSocketFactory(), pgStream.getHostSpec(), cancelSignalTimeout); if (cancelSignalTimeout > 0) { cancelStream.getSocket().setSoTimeout(cancelSignalTimeout); } cancelStream.sendInteger4(16); cancelStream.sendInteger2(1234); cancelStream.sendInteger2(5678); cancelStream.sendInteger4(cancelPid); cancelStream.sendInteger4(cancelKey); cancelStream.flush(); cancelStream.receiveEOF(); } catch (IOException e) { // Safe to ignore. if (logger.logDebug()) { logger.debug("Ignoring exception on cancel request:", e); } } finally { if (cancelStream != null) { try { cancelStream.close(); } catch (IOException e) { // Ignored. } } } }
向数据库server发送cancel指令
killTimerTask()
private void killTimerTask() { boolean timerTaskIsClear = cleanupTimer(); // The order is important here: in case we need to wait for the cancel task, the state must be // kept StatementCancelState.IN_QUERY, so cancelTask would be able to cancel the query. // It is believed that this case is very rare, so "additional cancel and wait below" would not // harm it. if (timerTaskIsClear && STATE_UPDATER.compareAndSet(this, StatementCancelState.IN_QUERY, StatementCancelState.IDLE)) { return; } // Being here means someone managed to call .cancel() and our connection did not receive // "timeout error" // We wait till state becomes "cancelled" boolean interrupted = false; while (!STATE_UPDATER.compareAndSet(this, StatementCancelState.CANCELLED, StatementCancelState.IDLE)) { synchronized (connection) { try { // Note: wait timeout here is irrelevant since synchronized(connection) would block until // .cancel finishes connection.wait(10); } catch (InterruptedException e) { // NOSONAR // Either re-interrupt this method or rethrow the "InterruptedException" interrupted = true; } } } if (interrupted) { Thread.currentThread().interrupt(); } }
这里先调用cleanupTimer,然后更新statement的状态
PgConnection
postgresql-9.4.1212.jre7-sources.jar!/org/postgresql/jdbc/PgConnection.java
getTimer()
private synchronized Timer getTimer() { if (cancelTimer == null) { cancelTimer = Driver.getSharedTimer().getTimer(); } return cancelTimer; }
这里创建或获取一个timer
addTimerTask()
public void addTimerTask(TimerTask timerTask, long milliSeconds) { Timer timer = getTimer(); timer.schedule(timerTask, milliSeconds); }
这个添加timerTask就是直接调度了
purgeTimerTasks()
postgresql-9.4.1212.jre7-sources.jar!/org/postgresql/jdbc/PgConnection.java
public void purgeTimerTasks() { Timer timer = cancelTimer; if (timer != null) { timer.purge(); } }
在cleanupTimer中被调用,用来清理已经被cancel掉的timer task
PgResultSet
postgresql-9.4.1212.jre7-sources.jar!/org/postgresql/jdbc/PgResultSet.java
next()
public boolean next() throws SQLException { checkClosed(); if (onInsertRow) { throw new PSQLException(GT.tr("Can''t use relative move methods while on the insert row."), PSQLState.INVALID_CURSOR_STATE); } if (current_row + 1 >= rows.size()) { if (cursor == null || (maxRows > 0 && row_offset + rows.size() >= maxRows)) { current_row = rows.size(); this_row = null; rowBuffer = null; return false; // End of the resultset. } // Ask for some more data. row_offset += rows.size(); // We are discarding some data. int fetchRows = fetchSize; if (maxRows != 0) { if (fetchRows == 0 || row_offset + fetchRows > maxRows) { // Fetch would exceed maxRows, limit it. fetchRows = maxRows - row_offset; } } // Execute the fetch and update this resultset. connection.getQueryExecutor().fetch(cursor, new CursorResultHandler(), fetchRows); current_row = 0; // Test the new rows array. if (rows.isEmpty()) { this_row = null; rowBuffer = null; return false; } } else { current_row++; } initRowBuffer(); return true; }
这里的fetch没有像executeQuery那样加timer
postgresql-9.4.1212.jre7-sources.jar!/org/postgresql/core/v3/QueryExecutorImpl.java
public synchronized void fetch(ResultCursor cursor, ResultHandler handler, int fetchSize) throws SQLException { waitOnLock(); final Portal portal = (Portal) cursor; // Insert a ResultHandler that turns bare command statuses into empty datasets // (if the fetch returns no rows, we see just a CommandStatus..) final ResultHandler delegateHandler = handler; handler = new ResultHandlerDelegate(delegateHandler) { public void handleCommandStatus(String status, int updateCount, long insertOID) { handleResultRows(portal.getQuery(), null, new ArrayList<byte[][]>(), null); } }; // Now actually run it. try { processDeadParsedQueries(); processDeadPortals(); sendExecute(portal.getQuery(), portal, fetchSize); sendSync(); processResults(handler, 0); estimatedReceiveBufferBytes = 0; } catch (IOException e) { abort(); handler.handleError( new PSQLException(GT.tr("An I/O error occurred while sending to the backend."), PSQLState.CONNECTION_FAILURE, e)); } handler.handleCompletion(); }
小结
- queryTimeout是采用添加timer来控制,如果请求过多,可能会造成timer过多
timeout时间不宜过长,不过正常执行完sql,会调用killTimerTask()方,里头会先cleanupTimer,取消timerTask,然后调用purgeTimerTasks()清理cancel掉的task,避免timeout时间过长导致task堆积最后内存溢出
- 超时之后会timer task会向数据库server发送cancel query指令
-
发送完cancel query指令之后,client端的查询按预期应该抛出SQLException(
这里头的机制有待深入研究,可能是server端返回timeout error
) - executeQuery方法默认会拉取fetchSize的数据并返回
- next()方法根据需要再去fetch,这个fetch方法就没有timer来限制时间了,但是最底层应该是受socketTimeout限制
以上就是本文的全部内容,希望对大家的学习有所帮助,也希望大家多多支持 码农网
猜你喜欢:- 聊聊storm TridentBoltExecutor的finishBatch方法
- 聊聊动态规划(2) -- 特征
- 聊聊动态规划(1) -- 概念
- 聊聊基准测试
- 聊聊MyBatis缓存机制
- 聊聊 HashMap
本站部分资源来源于网络,本站转载出于传递更多信息之目的,版权归原作者或者来源机构所有,如转载稿涉及版权问题,请联系我们。
Lighttpd
Andre Bogus / Packt Publishing / 2008-10 / 39.99
This is your fast guide to getting started and getting inside the Lighttpd web server. Written from a developer's perspective, this book helps you understand Lighttpd, and get it set up as securely an......一起来看看 《Lighttpd》 这本书的介绍吧!
XML、JSON 在线转换
在线XML、JSON转换工具
XML 在线格式化
在线 XML 格式化压缩工具