#include "ldlm_internal.h"
int ldlm_flock_blocking_ast(struct ldlm_lock *lock, struct ldlm_lock_desc *desc,
- void *data, int flag);
+ void *data, int flag);
/**
* list_for_remaining_safe - iterate over the remaining entries in a list
* \param head the head for your list.
*/
#define list_for_remaining_safe(pos, n, head) \
- for (n = pos->next; pos != (head); pos = n, n = pos->next)
+ for (n = pos->next; pos != (head); pos = n, n = pos->next)
static inline int
ldlm_same_flock_owner(struct ldlm_lock *lock, struct ldlm_lock *new)
{
- return((new->l_policy_data.l_flock.owner ==
- lock->l_policy_data.l_flock.owner) &&
- (new->l_export == lock->l_export));
+ return ((new->l_policy_data.l_flock.owner ==
+ lock->l_policy_data.l_flock.owner) &&
+ (new->l_export == lock->l_export));
}
static inline int
ldlm_flocks_overlap(struct ldlm_lock *lock, struct ldlm_lock *new)
{
- return((new->l_policy_data.l_flock.start <=
- lock->l_policy_data.l_flock.end) &&
- (new->l_policy_data.l_flock.end >=
- lock->l_policy_data.l_flock.start));
+ return ((new->l_policy_data.l_flock.start <=
+ lock->l_policy_data.l_flock.end) &&
+ (new->l_policy_data.l_flock.end >=
+ lock->l_policy_data.l_flock.start));
}
static inline void ldlm_flock_blocking_link(struct ldlm_lock *req,
struct ldlm_lock *lock)
{
- /* For server only */
- if (req->l_export == NULL)
+ /* For server only */
+ if (req->l_export == NULL)
return;
LASSERT(hlist_unhashed(&req->l_exp_flock_hash));
- req->l_policy_data.l_flock.blocking_owner =
- lock->l_policy_data.l_flock.owner;
- req->l_policy_data.l_flock.blocking_export =
+ req->l_policy_data.l_flock.blocking_owner =
+ lock->l_policy_data.l_flock.owner;
+ req->l_policy_data.l_flock.blocking_export =
lock->l_export;
atomic_set(&req->l_policy_data.l_flock.blocking_refs, 0);
static inline void ldlm_flock_blocking_unlink(struct ldlm_lock *req)
{
- /* For server only */
- if (req->l_export == NULL)
- return;
+ /* For server only */
+ if (req->l_export == NULL)
+ return;
check_res_locked(req->l_resource);
if (req->l_export->exp_flock_hash != NULL &&
/* client side - set a flag to prevent sending a CANCEL */
lock->l_flags |= LDLM_FL_LOCAL_ONLY | LDLM_FL_CBPENDING;
- /* when reaching here, it is under lock_res_and_lock(). Thus,
- need call the nolock version of ldlm_lock_decref_internal*/
- ldlm_lock_decref_internal_nolock(lock, mode);
- }
+ /* when reaching here, it is under lock_res_and_lock(). Thus,
+ * need call the nolock version of ldlm_lock_decref_internal
+ */
+ ldlm_lock_decref_internal_nolock(lock, mode);
+ }
- ldlm_lock_destroy_nolock(lock);
- EXIT;
+ ldlm_lock_destroy_nolock(lock);
+ EXIT;
}
/**
static int
ldlm_flock_deadlock(struct ldlm_lock *req, struct ldlm_lock *bl_lock)
{
- struct obd_export *req_exp = req->l_export;
- struct obd_export *bl_exp = bl_lock->l_export;
- __u64 req_owner = req->l_policy_data.l_flock.owner;
- __u64 bl_owner = bl_lock->l_policy_data.l_flock.owner;
+ struct obd_export *req_exp = req->l_export;
+ struct obd_export *bl_exp = bl_lock->l_export;
+ __u64 req_owner = req->l_policy_data.l_flock.owner;
+ __u64 bl_owner = bl_lock->l_policy_data.l_flock.owner;
- /* For server only */
- if (req_exp == NULL)
- return 0;
+ /* For server only */
+ if (req_exp == NULL)
+ return 0;
- class_export_get(bl_exp);
+ class_export_get(bl_exp);
while (1) {
struct ldlm_flock_lookup_cb_data cb_data = {
- .bl_owner = &bl_owner,
- .lock = NULL,
- .exp = NULL };
+ .bl_owner = &bl_owner,
+ .lock = NULL,
+ .exp = NULL,
+ };
struct obd_export *bl_exp_new;
struct ldlm_lock *lock = NULL;
struct ldlm_flock *flock;
if (bl_exp->exp_flock_hash != NULL) {
- cfs_hash_for_each_key(bl_exp->exp_obd->obd_nid_hash,
+ cfs_hash_for_each_key(
+ bl_exp->exp_obd->obd_nid_hash,
&bl_exp->exp_connection->c_peer.nid,
ldlm_flock_lookup_cb, &cb_data);
lock = cb_data.lock;
LASSERT(req != lock);
flock = &lock->l_policy_data.l_flock;
LASSERT(flock->owner == bl_owner);
- bl_owner = flock->blocking_owner;
- bl_exp_new = class_export_get(flock->blocking_export);
- class_export_put(bl_exp);
+ bl_owner = flock->blocking_owner;
+ bl_exp_new = class_export_get(flock->blocking_export);
+ class_export_put(bl_exp);
cfs_hash_put(bl_exp->exp_flock_hash, &lock->l_exp_flock_hash);
- bl_exp = bl_exp_new;
+ bl_exp = bl_exp_new;
if (bl_exp->exp_failed)
break;
if (bl_owner == req_owner &&
(bl_exp->exp_connection->c_peer.nid ==
req_exp->exp_connection->c_peer.nid)) {
- class_export_put(bl_exp);
- return 1;
- }
- }
- class_export_put(bl_exp);
+ class_export_put(bl_exp);
+ return 1;
+ }
+ }
+ class_export_put(bl_exp);
- return 0;
+ return 0;
}
static void ldlm_flock_cancel_on_deadlock(struct ldlm_lock *lock,
CDEBUG(D_INFO, "reprocess deadlock req=%p\n", lock);
if ((exp_connect_flags(lock->l_export) &
- OBD_CONNECT_FLOCK_DEAD) == 0) {
- CERROR("deadlock found, but client doesn't "
- "support flock canceliation\n");
+ OBD_CONNECT_FLOCK_DEAD) == 0) {
+ CERROR("deadlock found, but client doesn't support flock canceliation\n");
} else {
LASSERT(lock->l_completion_ast);
LASSERT(!ldlm_is_ast_sent(lock));
- lock->l_flags |= LDLM_FL_AST_SENT | LDLM_FL_CANCEL_ON_BLOCK |
- LDLM_FL_FLOCK_DEADLOCK;
+ lock->l_flags |= (LDLM_FL_AST_SENT | LDLM_FL_CANCEL_ON_BLOCK |
+ LDLM_FL_FLOCK_DEADLOCK);
ldlm_flock_blocking_unlink(lock);
ldlm_resource_unlink_lock(lock);
ldlm_add_ast_work_item(lock, NULL, work_list);
int overlaps = 0;
int splitted = 0;
const struct ldlm_callback_suite null_cbs = { NULL };
- struct list_head *grant_work = intention == LDLM_PROCESS_ENQUEUE ?
- NULL : work_list;
+ struct list_head *grant_work = (intention == LDLM_PROCESS_ENQUEUE ?
+ NULL : work_list);
ENTRY;
CDEBUG(D_DLMTRACE, "flags %#llx owner %llu pid %u mode %u start "
"%llu end %llu\n", *flags,
new->l_policy_data.l_flock.owner,
- new->l_policy_data.l_flock.pid, mode,
- req->l_policy_data.l_flock.start,
- req->l_policy_data.l_flock.end);
+ new->l_policy_data.l_flock.pid, mode,
+ req->l_policy_data.l_flock.start,
+ req->l_policy_data.l_flock.end);
- *err = ELDLM_OK;
+ *err = ELDLM_OK;
- if (local) {
- /* No blocking ASTs are sent to the clients for
- * Posix file & record locks */
- req->l_blocking_ast = NULL;
- } else {
- /* Called on the server for lock cancels. */
- req->l_blocking_ast = ldlm_flock_blocking_ast;
- }
+ if (local) {
+ /* No blocking ASTs are sent to the clients for
+ * Posix file & record locks
+ */
+ req->l_blocking_ast = NULL;
+ } else {
+ /* Called on the server for lock cancels. */
+ req->l_blocking_ast = ldlm_flock_blocking_ast;
+ }
reprocess:
- if ((*flags == LDLM_FL_WAIT_NOREPROC) || (mode == LCK_NL)) {
- /* This loop determines where this processes locks start
- * in the resource lr_granted list. */
+ if ((*flags == LDLM_FL_WAIT_NOREPROC) || (mode == LCK_NL)) {
+ /* This loop determines where this processes locks start
+ * in the resource lr_granted list.
+ */
list_for_each(tmp, &res->lr_granted) {
lock = list_entry(tmp, struct ldlm_lock,
- l_res_link);
- if (ldlm_same_flock_owner(lock, req)) {
- ownlocks = tmp;
- break;
- }
- }
- } else {
+ l_res_link);
+ if (ldlm_same_flock_owner(lock, req)) {
+ ownlocks = tmp;
+ break;
+ }
+ }
+ } else {
int reprocess_failed = 0;
- lockmode_verify(mode);
+ lockmode_verify(mode);
- /* This loop determines if there are existing locks
- * that conflict with the new lock request. */
+ /* This loop determines if there are existing locks
+ * that conflict with the new lock request.
+ */
list_for_each(tmp, &res->lr_granted) {
lock = list_entry(tmp, struct ldlm_lock,
- l_res_link);
+ l_res_link);
- if (ldlm_same_flock_owner(lock, req)) {
- if (!ownlocks)
- ownlocks = tmp;
- continue;
- }
+ if (ldlm_same_flock_owner(lock, req)) {
+ if (!ownlocks)
+ ownlocks = tmp;
+ continue;
+ }
- /* locks are compatible, overlap doesn't matter */
- if (lockmode_compat(lock->l_granted_mode, mode))
- continue;
+ /* locks are compatible, overlap doesn't matter */
+ if (lockmode_compat(lock->l_granted_mode, mode))
+ continue;
- if (!ldlm_flocks_overlap(lock, req))
- continue;
+ if (!ldlm_flocks_overlap(lock, req))
+ continue;
if (intention != LDLM_PROCESS_ENQUEUE) {
reprocess_failed = 1;
if (ldlm_flock_deadlock(req, lock)) {
- ldlm_flock_cancel_on_deadlock(req,
- grant_work);
+ ldlm_flock_cancel_on_deadlock(
+ req, grant_work);
RETURN(LDLM_ITER_CONTINUE);
}
continue;
}
- if (*flags & LDLM_FL_BLOCK_NOWAIT) {
- ldlm_flock_destroy(req, mode, *flags);
- *err = -EAGAIN;
- RETURN(LDLM_ITER_STOP);
- }
-
- if (*flags & LDLM_FL_TEST_LOCK) {
- ldlm_flock_destroy(req, mode, *flags);
- req->l_req_mode = lock->l_granted_mode;
- req->l_policy_data.l_flock.pid =
- lock->l_policy_data.l_flock.pid;
- req->l_policy_data.l_flock.start =
- lock->l_policy_data.l_flock.start;
- req->l_policy_data.l_flock.end =
- lock->l_policy_data.l_flock.end;
- *flags |= LDLM_FL_LOCK_CHANGED;
- RETURN(LDLM_ITER_STOP);
- }
+ if (*flags & LDLM_FL_BLOCK_NOWAIT) {
+ ldlm_flock_destroy(req, mode, *flags);
+ *err = -EAGAIN;
+ RETURN(LDLM_ITER_STOP);
+ }
+
+ if (*flags & LDLM_FL_TEST_LOCK) {
+ ldlm_flock_destroy(req, mode, *flags);
+ req->l_req_mode = lock->l_granted_mode;
+ req->l_policy_data.l_flock.pid =
+ lock->l_policy_data.l_flock.pid;
+ req->l_policy_data.l_flock.start =
+ lock->l_policy_data.l_flock.start;
+ req->l_policy_data.l_flock.end =
+ lock->l_policy_data.l_flock.end;
+ *flags |= LDLM_FL_LOCK_CHANGED;
+ RETURN(LDLM_ITER_STOP);
+ }
/* add lock to blocking list before deadlock
- * check to prevent race */
+ * check to prevent race
+ */
ldlm_flock_blocking_link(req, lock);
if (ldlm_flock_deadlock(req, lock)) {
RETURN(LDLM_ITER_STOP);
}
- ldlm_resource_add_lock(res, &res->lr_waiting, req);
- *flags |= LDLM_FL_BLOCK_GRANTED;
- RETURN(LDLM_ITER_STOP);
- }
+ ldlm_resource_add_lock(res, &res->lr_waiting, req);
+ *flags |= LDLM_FL_BLOCK_GRANTED;
+ RETURN(LDLM_ITER_STOP);
+ }
if (reprocess_failed)
RETURN(LDLM_ITER_CONTINUE);
- }
+ }
- if (*flags & LDLM_FL_TEST_LOCK) {
- ldlm_flock_destroy(req, mode, *flags);
- req->l_req_mode = LCK_NL;
- *flags |= LDLM_FL_LOCK_CHANGED;
- RETURN(LDLM_ITER_STOP);
- }
+ if (*flags & LDLM_FL_TEST_LOCK) {
+ ldlm_flock_destroy(req, mode, *flags);
+ req->l_req_mode = LCK_NL;
+ *flags |= LDLM_FL_LOCK_CHANGED;
+ RETURN(LDLM_ITER_STOP);
+ }
- /* In case we had slept on this lock request take it off of the
- * deadlock detection hash list. */
- ldlm_flock_blocking_unlink(req);
+ /* In case we had slept on this lock request take it off of the
+ * deadlock detection hash list.
+ */
+ ldlm_flock_blocking_unlink(req);
- /* Scan the locks owned by this process that overlap this request.
- * We may have to merge or split existing locks. */
+ /* Scan the locks owned by this process that overlap this request.
+ * We may have to merge or split existing locks.
+ */
- if (!ownlocks)
- ownlocks = &res->lr_granted;
+ if (!ownlocks)
+ ownlocks = &res->lr_granted;
- list_for_remaining_safe(ownlocks, tmp, &res->lr_granted) {
+ list_for_remaining_safe(ownlocks, tmp, &res->lr_granted) {
lock = list_entry(ownlocks, struct ldlm_lock, l_res_link);
- if (!ldlm_same_flock_owner(lock, new))
- break;
-
- if (lock->l_granted_mode == mode) {
- /* If the modes are the same then we need to process
- * locks that overlap OR adjoin the new lock. The extra
- * logic condition is necessary to deal with arithmetic
- * overflow and underflow. */
- if ((new->l_policy_data.l_flock.start >
- (lock->l_policy_data.l_flock.end + 1))
- && (lock->l_policy_data.l_flock.end !=
- OBD_OBJECT_EOF))
- continue;
-
- if ((new->l_policy_data.l_flock.end <
- (lock->l_policy_data.l_flock.start - 1))
- && (lock->l_policy_data.l_flock.start != 0))
- break;
-
- if (new->l_policy_data.l_flock.start <
- lock->l_policy_data.l_flock.start) {
- lock->l_policy_data.l_flock.start =
- new->l_policy_data.l_flock.start;
- } else {
- new->l_policy_data.l_flock.start =
- lock->l_policy_data.l_flock.start;
- }
-
- if (new->l_policy_data.l_flock.end >
- lock->l_policy_data.l_flock.end) {
- lock->l_policy_data.l_flock.end =
- new->l_policy_data.l_flock.end;
- } else {
- new->l_policy_data.l_flock.end =
- lock->l_policy_data.l_flock.end;
- }
-
- if (added) {
- ldlm_flock_destroy(lock, mode, *flags);
- } else {
- new = lock;
- added = 1;
- }
- continue;
- }
-
- if (new->l_policy_data.l_flock.start >
- lock->l_policy_data.l_flock.end)
- continue;
-
- if (new->l_policy_data.l_flock.end <
- lock->l_policy_data.l_flock.start)
- break;
-
- ++overlaps;
-
- if (new->l_policy_data.l_flock.start <=
- lock->l_policy_data.l_flock.start) {
- if (new->l_policy_data.l_flock.end <
- lock->l_policy_data.l_flock.end) {
- lock->l_policy_data.l_flock.start =
- new->l_policy_data.l_flock.end + 1;
- break;
- }
- ldlm_flock_destroy(lock, lock->l_req_mode, *flags);
- continue;
- }
- if (new->l_policy_data.l_flock.end >=
- lock->l_policy_data.l_flock.end) {
- lock->l_policy_data.l_flock.end =
- new->l_policy_data.l_flock.start - 1;
- continue;
- }
-
- /* split the existing lock into two locks */
-
- /* if this is an F_UNLCK operation then we could avoid
- * allocating a new lock and use the req lock passed in
- * with the request but this would complicate the reply
- * processing since updates to req get reflected in the
- * reply. The client side replays the lock request so
- * it must see the original lock data in the reply. */
-
- /* XXX - if ldlm_lock_new() can sleep we should
- * release the lr_lock, allocate the new lock,
- * and restart processing this lock. */
+ if (!ldlm_same_flock_owner(lock, new))
+ break;
+
+ if (lock->l_granted_mode == mode) {
+ /* If the modes are the same then we need to process
+ * locks that overlap OR adjoin the new lock. The extra
+ * logic condition is necessary to deal with arithmetic
+ * overflow and underflow.
+ */
+ if ((new->l_policy_data.l_flock.start >
+ (lock->l_policy_data.l_flock.end + 1))
+ && (lock->l_policy_data.l_flock.end !=
+ OBD_OBJECT_EOF))
+ continue;
+
+ if ((new->l_policy_data.l_flock.end <
+ (lock->l_policy_data.l_flock.start - 1))
+ && (lock->l_policy_data.l_flock.start != 0))
+ break;
+
+ if (new->l_policy_data.l_flock.start <
+ lock->l_policy_data.l_flock.start) {
+ lock->l_policy_data.l_flock.start =
+ new->l_policy_data.l_flock.start;
+ } else {
+ new->l_policy_data.l_flock.start =
+ lock->l_policy_data.l_flock.start;
+ }
+
+ if (new->l_policy_data.l_flock.end >
+ lock->l_policy_data.l_flock.end) {
+ lock->l_policy_data.l_flock.end =
+ new->l_policy_data.l_flock.end;
+ } else {
+ new->l_policy_data.l_flock.end =
+ lock->l_policy_data.l_flock.end;
+ }
+
+ if (added) {
+ ldlm_flock_destroy(lock, mode, *flags);
+ } else {
+ new = lock;
+ added = 1;
+ }
+ continue;
+ }
+
+ if (new->l_policy_data.l_flock.start >
+ lock->l_policy_data.l_flock.end)
+ continue;
+
+ if (new->l_policy_data.l_flock.end <
+ lock->l_policy_data.l_flock.start)
+ break;
+
+ ++overlaps;
+
+ if (new->l_policy_data.l_flock.start <=
+ lock->l_policy_data.l_flock.start) {
+ if (new->l_policy_data.l_flock.end <
+ lock->l_policy_data.l_flock.end) {
+ lock->l_policy_data.l_flock.start =
+ new->l_policy_data.l_flock.end + 1;
+ break;
+ }
+ ldlm_flock_destroy(lock, lock->l_req_mode, *flags);
+ continue;
+ }
+ if (new->l_policy_data.l_flock.end >=
+ lock->l_policy_data.l_flock.end) {
+ lock->l_policy_data.l_flock.end =
+ new->l_policy_data.l_flock.start - 1;
+ continue;
+ }
+
+ /* split the existing lock into two locks */
+
+ /* if this is an F_UNLCK operation then we could avoid
+ * allocating a new lock and use the req lock passed in
+ * with the request but this would complicate the reply
+ * processing since updates to req get reflected in the
+ * reply. The client side replays the lock request so
+ * it must see the original lock data in the reply.
+ */
+
+ /* XXX - if ldlm_lock_new() can sleep we should
+ * release the lr_lock, allocate the new lock,
+ * and restart processing this lock.
+ */
if (new2 == NULL) {
unlock_res_and_lock(req);
new2 = ldlm_lock_create(ns, &res->lr_name, LDLM_FLOCK,
goto reprocess;
}
- splitted = 1;
-
- new2->l_granted_mode = lock->l_granted_mode;
- new2->l_policy_data.l_flock.pid =
- new->l_policy_data.l_flock.pid;
- new2->l_policy_data.l_flock.owner =
- new->l_policy_data.l_flock.owner;
- new2->l_policy_data.l_flock.start =
- lock->l_policy_data.l_flock.start;
- new2->l_policy_data.l_flock.end =
- new->l_policy_data.l_flock.start - 1;
- lock->l_policy_data.l_flock.start =
- new->l_policy_data.l_flock.end + 1;
- new2->l_conn_export = lock->l_conn_export;
- if (lock->l_export != NULL) {
- new2->l_export = class_export_lock_get(lock->l_export, new2);
- if (new2->l_export->exp_lock_hash &&
+ splitted = 1;
+
+ new2->l_granted_mode = lock->l_granted_mode;
+ new2->l_policy_data.l_flock.pid =
+ new->l_policy_data.l_flock.pid;
+ new2->l_policy_data.l_flock.owner =
+ new->l_policy_data.l_flock.owner;
+ new2->l_policy_data.l_flock.start =
+ lock->l_policy_data.l_flock.start;
+ new2->l_policy_data.l_flock.end =
+ new->l_policy_data.l_flock.start - 1;
+ lock->l_policy_data.l_flock.start =
+ new->l_policy_data.l_flock.end + 1;
+ new2->l_conn_export = lock->l_conn_export;
+ if (lock->l_export != NULL) {
+ new2->l_export = class_export_lock_get(lock->l_export,
+ new2);
+ if (new2->l_export->exp_lock_hash &&
hlist_unhashed(&new2->l_exp_hash))
- cfs_hash_add(new2->l_export->exp_lock_hash,
- &new2->l_remote_handle,
- &new2->l_exp_hash);
- }
- if (*flags == LDLM_FL_WAIT_NOREPROC)
- ldlm_lock_addref_internal_nolock(new2,
- lock->l_granted_mode);
-
- /* insert new2 at lock */
- ldlm_resource_add_lock(res, ownlocks, new2);
- LDLM_LOCK_RELEASE(new2);
- break;
- }
-
- /* if new2 is created but never used, destroy it*/
- if (splitted == 0 && new2 != NULL)
- ldlm_lock_destroy_nolock(new2);
-
- /* At this point we're granting the lock request. */
- req->l_granted_mode = req->l_req_mode;
-
- /* Add req to the granted queue before calling ldlm_reprocess_all(). */
- if (!added) {
+ cfs_hash_add(new2->l_export->exp_lock_hash,
+ &new2->l_remote_handle,
+ &new2->l_exp_hash);
+ }
+ if (*flags == LDLM_FL_WAIT_NOREPROC)
+ ldlm_lock_addref_internal_nolock(new2,
+ lock->l_granted_mode);
+
+ /* insert new2 at lock */
+ ldlm_resource_add_lock(res, ownlocks, new2);
+ LDLM_LOCK_RELEASE(new2);
+ break;
+ }
+
+ /* if new2 is created but never used, destroy it*/
+ if (splitted == 0 && new2 != NULL)
+ ldlm_lock_destroy_nolock(new2);
+
+ /* At this point we're granting the lock request. */
+ req->l_granted_mode = req->l_req_mode;
+
+ /* Add req to the granted queue before calling ldlm_reprocess_all(). */
+ if (!added) {
list_del_init(&req->l_res_link);
- /* insert new lock before ownlocks in list. */
- ldlm_resource_add_lock(res, ownlocks, req);
- }
+ /* insert new lock before ownlocks in list. */
+ ldlm_resource_add_lock(res, ownlocks, req);
+ }
- if (*flags != LDLM_FL_WAIT_NOREPROC) {
+ if (*flags != LDLM_FL_WAIT_NOREPROC) {
#ifdef HAVE_SERVER_SUPPORT
if (intention == LDLM_PROCESS_ENQUEUE) {
- /* If this is an unlock, reprocess the waitq and
- * send completions ASTs for locks that can now be
- * granted. The only problem with doing this
- * reprocessing here is that the completion ASTs for
- * newly granted locks will be sent before the unlock
- * completion is sent. It shouldn't be an issue. Also
- * note that ldlm_process_flock_lock() will recurse,
+ /* If this is an unlock, reprocess the waitq and
+ * send completions ASTs for locks that can now be
+ * granted. The only problem with doing this
+ * reprocessing here is that the completion ASTs for
+ * newly granted locks will be sent before the unlock
+ * completion is sent. It shouldn't be an issue. Also
+ * note that ldlm_process_flock_lock() will recurse,
* but only once because 'intention' won't be
- * LDLM_PROCESS_ENQUEUE from ldlm_reprocess_queue. */
+ * LDLM_PROCESS_ENQUEUE from ldlm_reprocess_queue.
+ */
if ((mode == LCK_NL) && overlaps) {
struct list_head rpc_list;
- int rc;
+ int rc;
INIT_LIST_HEAD(&rpc_list);
restart:
&rpc_list,
LDLM_PROCESS_RESCAN, NULL);
- unlock_res_and_lock(req);
- rc = ldlm_run_ast_work(ns, &rpc_list,
- LDLM_WORK_CP_AST);
- lock_res_and_lock(req);
+ unlock_res_and_lock(req);
+ rc = ldlm_run_ast_work(ns, &rpc_list,
+ LDLM_WORK_CP_AST);
+ lock_res_and_lock(req);
if (rc == -ERESTART)
GOTO(restart, rc);
- }
- } else {
- LASSERT(req->l_completion_ast);
+ }
+ } else {
+ LASSERT(req->l_completion_ast);
ldlm_add_ast_work_item(req, NULL, grant_work);
- }
+ }
#else /* !HAVE_SERVER_SUPPORT */
- /* The only one possible case for client-side calls flock
- * policy function is ldlm_flock_completion_ast inside which
- * carries LDLM_FL_WAIT_NOREPROC flag. */
- CERROR("Illegal parameter for client-side-only module.\n");
- LBUG();
+ /* The only one possible case for client-side calls flock
+ * policy function is ldlm_flock_completion_ast inside which
+ * carries LDLM_FL_WAIT_NOREPROC flag.
+ */
+ CERROR("Illegal parameter for client-side-only module.\n");
+ LBUG();
#endif /* HAVE_SERVER_SUPPORT */
- }
+ }
/* In case we're reprocessing the requested lock we can't destroy
* it until after calling ldlm_add_ast_work_item() above so that laawi()
* can bump the reference count on \a req. Otherwise \a req
- * could be freed before the completion AST can be sent. */
- if (added)
- ldlm_flock_destroy(req, mode, *flags);
+ * could be freed before the completion AST can be sent.
+ */
+ if (added)
+ ldlm_flock_destroy(req, mode, *flags);
- ldlm_resource_dump(D_INFO, res);
- RETURN(LDLM_ITER_CONTINUE);
+ ldlm_resource_dump(D_INFO, res);
+ RETURN(LDLM_ITER_CONTINUE);
}
struct ldlm_flock_wait_data {
- struct ldlm_lock *fwd_lock;
- int fwd_generation;
+ struct ldlm_lock *fwd_lock;
+ int fwd_generation;
};
static void
ldlm_flock_interrupted_wait(void *data)
{
- struct ldlm_lock *lock;
- ENTRY;
+ struct ldlm_lock *lock;
+ ENTRY;
- lock = ((struct ldlm_flock_wait_data *)data)->fwd_lock;
+ lock = ((struct ldlm_flock_wait_data *)data)->fwd_lock;
- /* take lock off the deadlock detection hash list. */
+ /* take lock off the deadlock detection hash list.
+ */
lock_res_and_lock(lock);
- ldlm_flock_blocking_unlink(lock);
+ ldlm_flock_blocking_unlink(lock);
/* client side - set flag to prevent lock from being put on LRU list */
ldlm_set_cbpending(lock);
- unlock_res_and_lock(lock);
+ unlock_res_and_lock(lock);
- EXIT;
+ EXIT;
}
/**
RETURN(0);
}
- LDLM_DEBUG(lock, "client-side enqueue returned a blocked lock, "
- "sleeping");
- fwd.fwd_lock = lock;
- obd = class_exp2obd(lock->l_conn_export);
+ LDLM_DEBUG(lock,
+ "client-side enqueue returned a blocked lock, sleeping");
+ fwd.fwd_lock = lock;
+ obd = class_exp2obd(lock->l_conn_export);
- /* if this is a local lock, there is no import */
- if (NULL != obd)
- imp = obd->u.cli.cl_import;
+ /* if this is a local lock, there is no import */
+ if (obd)
+ imp = obd->u.cli.cl_import;
- if (NULL != imp) {
+ if (imp) {
spin_lock(&imp->imp_lock);
fwd.fwd_generation = imp->imp_generation;
spin_unlock(&imp->imp_lock);
- }
+ }
- lwi = LWI_TIMEOUT_INTR(0, NULL, ldlm_flock_interrupted_wait, &fwd);
+ lwi = LWI_TIMEOUT_INTR(0, NULL, ldlm_flock_interrupted_wait, &fwd);
- /* Go to sleep until the lock is granted. */
- rc = l_wait_event(lock->l_waitq, is_granted_or_cancelled(lock), &lwi);
+ /* Go to sleep until the lock is granted. */
+ rc = l_wait_event(lock->l_waitq, is_granted_or_cancelled(lock), &lwi);
- if (rc) {
- LDLM_DEBUG(lock, "client-side enqueue waking up: failed (%d)",
- rc);
- RETURN(rc);
- }
+ if (rc) {
+ LDLM_DEBUG(lock, "client-side enqueue waking up: failed (%d)",
+ rc);
+ RETURN(rc);
+ }
granted:
- OBD_FAIL_TIMEOUT(OBD_FAIL_LDLM_CP_CB_WAIT, 10);
+ OBD_FAIL_TIMEOUT(OBD_FAIL_LDLM_CP_CB_WAIT, 10);
if (OBD_FAIL_PRECHECK(OBD_FAIL_LDLM_CP_CB_WAIT4)) {
lock_res_and_lock(lock);
if (OBD_FAIL_PRECHECK(OBD_FAIL_LDLM_CP_CB_WAIT5)) {
lock_res_and_lock(lock);
/* DEADLOCK is always set with CBPENDING */
- lock->l_flags |= LDLM_FL_FAIL_LOC |
- LDLM_FL_FLOCK_DEADLOCK | LDLM_FL_CBPENDING;
+ lock->l_flags |= (LDLM_FL_FAIL_LOC |
+ LDLM_FL_FLOCK_DEADLOCK | LDLM_FL_CBPENDING);
unlock_res_and_lock(lock);
OBD_FAIL_TIMEOUT(OBD_FAIL_LDLM_CP_CB_WAIT5, 4);
}
/* Import invalidation. We need to actually release the lock
* references being held, so that it can go away. No point in
* holding the lock even if app still believes it has it, since
- * server already dropped it anyway. Only for granted locks too. */
+ * server already dropped it anyway. Only for granted locks too.
+ */
/* Do the same for DEADLOCK'ed locks. */
if (ldlm_is_failed(lock) || ldlm_is_flock_deadlock(lock)) {
int mode;
wake_up(&lock->l_waitq);
/* An error is still to be returned, to propagate it up to
- * ldlm_cli_enqueue_fini() caller. */
+ * ldlm_cli_enqueue_fini() caller.
+ */
RETURN(rc ? : -EIO);
}
__u64 noreproc = LDLM_FL_WAIT_NOREPROC;
/* We need to reprocess the lock to do merges or splits
- * with existing locks owned by this process. */
+ * with existing locks owned by this process.
+ */
ldlm_process_flock_lock(lock, &noreproc, 1, &err, NULL);
}
unlock_res_and_lock(lock);
EXPORT_SYMBOL(ldlm_flock_completion_ast);
int ldlm_flock_blocking_ast(struct ldlm_lock *lock, struct ldlm_lock_desc *desc,
- void *data, int flag)
+ void *data, int flag)
{
- ENTRY;
+ ENTRY;
- LASSERT(lock);
- LASSERT(flag == LDLM_CB_CANCELING);
+ LASSERT(lock);
+ LASSERT(flag == LDLM_CB_CANCELING);
/* take lock off the deadlock detection hash list. */
lock_res_and_lock(lock);
- ldlm_flock_blocking_unlink(lock);
+ ldlm_flock_blocking_unlink(lock);
unlock_res_and_lock(lock);
- RETURN(0);
+ RETURN(0);
}
void ldlm_flock_policy_wire_to_local(const union ldlm_wire_policy_data *wpolicy,