diff --git a/contrib/postgres_fdw/postgres_fdw.c b/contrib/postgres_fdw/postgres_fdw.c
index 1482436..ff15642 100644
--- a/contrib/postgres_fdw/postgres_fdw.c
+++ b/contrib/postgres_fdw/postgres_fdw.c
@@ -1623,7 +1623,19 @@ remove_async_node(ForeignScanState *node)
 	PgFdwScanState		*prev_state;
 	ForeignScanState	*cur;
 
-	/* no need to remove me */
+	if (fsstate->s.commonstate->busy)
+	{
+		/*
+		 * this node is waiting for result, absorb the result first so
+		 * that the following commands can be sent on the connection.
+		 */
+		PGconn *conn = fsstate->s.conn;
+
+		while(PQisBusy(conn))
+			PQclear(PQgetResult(conn));
+	}
+
+    /* no need to remove me */
 	if (!leader || !fsstate->queued)
 		return;
 
@@ -1631,23 +1643,7 @@ remove_async_node(ForeignScanState *node)
 
 	if (leader == node)
 	{
-		if (leader_state->s.commonstate->busy)
-		{
-			/*
-			 * this node is waiting for result, absorb the result first so
-			 * that the following commands can be sent on the connection.
-			 */
-			PgFdwScanState *leader_state = GetPgFdwScanState(leader);
-			PGconn *conn = leader_state->s.conn;
-
-			while(PQisBusy(conn))
-				PQclear(PQgetResult(conn));
-
-			leader_state->s.commonstate->busy = false;
-		}
-
 		move_to_next_waiter(node);
-
 		return;
 	}
 
@@ -1858,7 +1854,7 @@ postgresEndForeignScan(ForeignScanState *node)
 	/* Release remote connection */
 	ReleaseConnection(fsstate->s.conn);
 	fsstate->s.conn = NULL;
-
+	fsstate->s.commonstate->leader = NULL;
 	/* MemoryContexts will be deleted automatically. */
 }
 
