When the lock master processes a successful operation (request,
convert, cancel, or unlock), it will process the effects of the
change before sending the reply for the operation. The "effects"
of the operation are:
- blocking callbacks (basts) for any newly granted locks
- waiting or converting locks that can now be granted
The cast is queued on the local node when the reply from the lock
master is received. This means that a lock holder can receive a
bast for a lock mode that is doesn't yet know has been granted.
Signed-off-by: David Teigland <teigland@redhat.com>
if (can_be_queued(lkb)) {
error = -EINPROGRESS;
add_lkb(r, lkb, DLM_LKSTS_WAITING);
if (can_be_queued(lkb)) {
error = -EINPROGRESS;
add_lkb(r, lkb, DLM_LKSTS_WAITING);
- send_blocking_asts(r, lkb);
add_timeout(lkb);
goto out;
}
error = -EAGAIN;
add_timeout(lkb);
goto out;
}
error = -EAGAIN;
- if (force_blocking_asts(lkb))
- send_blocking_asts_all(r, lkb);
queue_cast(r, lkb, -EAGAIN);
queue_cast(r, lkb, -EAGAIN);
+static void do_request_effects(struct dlm_rsb *r, struct dlm_lkb *lkb,
+ int error)
+{
+ switch (error) {
+ case -EAGAIN:
+ if (force_blocking_asts(lkb))
+ send_blocking_asts_all(r, lkb);
+ break;
+ case -EINPROGRESS:
+ send_blocking_asts(r, lkb);
+ break;
+ }
+}
+
static int do_convert(struct dlm_rsb *r, struct dlm_lkb *lkb)
{
int error = 0;
static int do_convert(struct dlm_rsb *r, struct dlm_lkb *lkb)
{
int error = 0;
if (can_be_granted(r, lkb, 1, &deadlk)) {
grant_lock(r, lkb);
queue_cast(r, lkb, 0);
if (can_be_granted(r, lkb, 1, &deadlk)) {
grant_lock(r, lkb);
queue_cast(r, lkb, 0);
- grant_pending_locks(r);
if (_can_be_granted(r, lkb, 1)) {
grant_lock(r, lkb);
queue_cast(r, lkb, 0);
if (_can_be_granted(r, lkb, 1)) {
grant_lock(r, lkb);
queue_cast(r, lkb, 0);
- grant_pending_locks(r);
goto out;
}
/* else fall through and move to convert queue */
goto out;
}
/* else fall through and move to convert queue */
error = -EINPROGRESS;
del_lkb(r, lkb);
add_lkb(r, lkb, DLM_LKSTS_CONVERT);
error = -EINPROGRESS;
del_lkb(r, lkb);
add_lkb(r, lkb, DLM_LKSTS_CONVERT);
- send_blocking_asts(r, lkb);
add_timeout(lkb);
goto out;
}
error = -EAGAIN;
add_timeout(lkb);
goto out;
}
error = -EAGAIN;
- if (force_blocking_asts(lkb))
- send_blocking_asts_all(r, lkb);
queue_cast(r, lkb, -EAGAIN);
queue_cast(r, lkb, -EAGAIN);
+static void do_convert_effects(struct dlm_rsb *r, struct dlm_lkb *lkb,
+ int error)
+{
+ switch (error) {
+ case 0:
+ grant_pending_locks(r);
+ /* grant_pending_locks also sends basts */
+ break;
+ case -EAGAIN:
+ if (force_blocking_asts(lkb))
+ send_blocking_asts_all(r, lkb);
+ break;
+ case -EINPROGRESS:
+ send_blocking_asts(r, lkb);
+ break;
+ }
+}
+
static int do_unlock(struct dlm_rsb *r, struct dlm_lkb *lkb)
{
remove_lock(r, lkb);
queue_cast(r, lkb, -DLM_EUNLOCK);
static int do_unlock(struct dlm_rsb *r, struct dlm_lkb *lkb)
{
remove_lock(r, lkb);
queue_cast(r, lkb, -DLM_EUNLOCK);
- grant_pending_locks(r);
+static void do_unlock_effects(struct dlm_rsb *r, struct dlm_lkb *lkb,
+ int error)
+{
+ grant_pending_locks(r);
+}
+
/* returns: 0 did nothing, -DLM_ECANCEL canceled lock */
static int do_cancel(struct dlm_rsb *r, struct dlm_lkb *lkb)
/* returns: 0 did nothing, -DLM_ECANCEL canceled lock */
static int do_cancel(struct dlm_rsb *r, struct dlm_lkb *lkb)
error = revert_lock(r, lkb);
if (error) {
queue_cast(r, lkb, -DLM_ECANCEL);
error = revert_lock(r, lkb);
if (error) {
queue_cast(r, lkb, -DLM_ECANCEL);
- grant_pending_locks(r);
return -DLM_ECANCEL;
}
return 0;
}
return -DLM_ECANCEL;
}
return 0;
}
+static void do_cancel_effects(struct dlm_rsb *r, struct dlm_lkb *lkb,
+ int error)
+{
+ if (error)
+ grant_pending_locks(r);
+}
+
/*
* Four stage 3 varieties:
* _request_lock(), _convert_lock(), _unlock_lock(), _cancel_lock()
/*
* Four stage 3 varieties:
* _request_lock(), _convert_lock(), _unlock_lock(), _cancel_lock()
/* receive_request() calls do_request() on remote node */
error = send_request(r, lkb);
/* receive_request() calls do_request() on remote node */
error = send_request(r, lkb);
error = do_request(r, lkb);
error = do_request(r, lkb);
+ /* for remote locks the request_reply is sent
+ between do_request and do_request_effects */
+ do_request_effects(r, lkb, error);
+ }
/* receive_convert() calls do_convert() on remote node */
error = send_convert(r, lkb);
/* receive_convert() calls do_convert() on remote node */
error = send_convert(r, lkb);
error = do_convert(r, lkb);
error = do_convert(r, lkb);
+ /* for remote locks the convert_reply is sent
+ between do_convert and do_convert_effects */
+ do_convert_effects(r, lkb, error);
+ }
/* receive_unlock() calls do_unlock() on remote node */
error = send_unlock(r, lkb);
/* receive_unlock() calls do_unlock() on remote node */
error = send_unlock(r, lkb);
error = do_unlock(r, lkb);
error = do_unlock(r, lkb);
+ /* for remote locks the unlock_reply is sent
+ between do_unlock and do_unlock_effects */
+ do_unlock_effects(r, lkb, error);
+ }
/* receive_cancel() calls do_cancel() on remote node */
error = send_cancel(r, lkb);
/* receive_cancel() calls do_cancel() on remote node */
error = send_cancel(r, lkb);
error = do_cancel(r, lkb);
error = do_cancel(r, lkb);
+ /* for remote locks the cancel_reply is sent
+ between do_cancel and do_cancel_effects */
+ do_cancel_effects(r, lkb, error);
+ }
attach_lkb(r, lkb);
error = do_request(r, lkb);
send_request_reply(r, lkb, error);
attach_lkb(r, lkb);
error = do_request(r, lkb);
send_request_reply(r, lkb, error);
+ do_request_effects(r, lkb, error);
unlock_rsb(r);
put_rsb(r);
unlock_rsb(r);
put_rsb(r);
goto out;
receive_flags(lkb, ms);
goto out;
receive_flags(lkb, ms);
error = receive_convert_args(ls, lkb, ms);
error = receive_convert_args(ls, lkb, ms);
- if (error)
- goto out_reply;
+ if (error) {
+ send_convert_reply(r, lkb, error);
+ goto out;
+ }
+
reply = !down_conversion(lkb);
error = do_convert(r, lkb);
reply = !down_conversion(lkb);
error = do_convert(r, lkb);
if (reply)
send_convert_reply(r, lkb, error);
if (reply)
send_convert_reply(r, lkb, error);
+ do_convert_effects(r, lkb, error);
out:
unlock_rsb(r);
put_rsb(r);
out:
unlock_rsb(r);
put_rsb(r);
goto out;
receive_flags(lkb, ms);
goto out;
receive_flags(lkb, ms);
error = receive_unlock_args(ls, lkb, ms);
error = receive_unlock_args(ls, lkb, ms);
- if (error)
- goto out_reply;
+ if (error) {
+ send_unlock_reply(r, lkb, error);
+ goto out;
+ }
error = do_unlock(r, lkb);
error = do_unlock(r, lkb);
send_unlock_reply(r, lkb, error);
send_unlock_reply(r, lkb, error);
+ do_unlock_effects(r, lkb, error);
out:
unlock_rsb(r);
put_rsb(r);
out:
unlock_rsb(r);
put_rsb(r);
error = do_cancel(r, lkb);
send_cancel_reply(r, lkb, error);
error = do_cancel(r, lkb);
send_cancel_reply(r, lkb, error);
+ do_cancel_effects(r, lkb, error);
out:
unlock_rsb(r);
put_rsb(r);
out:
unlock_rsb(r);
put_rsb(r);