X-Git-Url: https://git.whamcloud.com/?a=blobdiff_plain;f=lustre%2Fldlm%2Fldlm_flock.c;h=3abad6f20107a6f7c1107c4858eee7b178101a85;hb=c0523b6a47ef186b0b63bb1d3792c33bd1ac2a5e;hp=15f46e2e4954a45c85f20149e162555149416544;hpb=b91058999ccd0e34bcc689acd1afdcad24de5590;p=fs%2Flustre-release.git diff --git a/lustre/ldlm/ldlm_flock.c b/lustre/ldlm/ldlm_flock.c index 15f46e2..3abad6f 100644 --- a/lustre/ldlm/ldlm_flock.c +++ b/lustre/ldlm/ldlm_flock.c @@ -1,43 +1,47 @@ /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*- * vim:expandtab:shiftwidth=8:tabstop=8: * - * Copyright (c) 2002, 2003 Cluster File Systems, Inc. - * Author: Peter Braam - * Author: Phil Schwan + * Copyright (c) 2003 Hewlett-Packard Development Company LP. + * Developed under the sponsorship of the US Government under + * Subcontract No. B514193 * - * This file is part of Lustre, http://www.lustre.org. + * This file is part of the Lustre file system, http://www.lustre.org + * Lustre is a trademark of Cluster File Systems, Inc. * - * Lustre is free software; you can redistribute it and/or - * modify it under the terms of version 2 of the GNU General Public - * License as published by the Free Software Foundation. + * You may have signed or agreed to another license before downloading + * this software. If so, you are bound by the terms and conditions + * of that agreement, and the following does not apply to you. See the + * LICENSE file included with this distribution for more information. * - * Lustre is distributed in the hope that it will be useful, - * but WITHOUT ANY WARRANTY; without even the implied warranty of - * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the - * GNU General Public License for more details. + * If you did not agree to a different license, then this copy of Lustre + * is open source software; you can redistribute it and/or modify it + * under the terms of version 2 of the GNU General Public License as + * published by the Free Software Foundation. * - * You should have received a copy of the GNU General Public License - * along with Lustre; if not, write to the Free Software - * Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA. + * In either case, Lustre is distributed in the hope that it will be + * useful, but WITHOUT ANY WARRANTY; without even the implied warranty + * of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * license text for more details. */ #define DEBUG_SUBSYSTEM S_LDLM #ifdef __KERNEL__ -#include -#include -#include -#include +#include +#include +#include +#include #include #else #include +#include #endif #include "ldlm_internal.h" #define l_flock_waitq l_lru -static struct list_head ldlm_flock_waitq = LIST_HEAD_INIT(ldlm_flock_waitq); +static struct list_head ldlm_flock_waitq = CFS_LIST_HEAD_INIT(ldlm_flock_waitq); int ldlm_flock_blocking_ast(struct ldlm_lock *lock, struct ldlm_lock_desc *desc, void *data, int flag); @@ -87,7 +91,7 @@ ldlm_flock_destroy(struct ldlm_lock *lock, ldlm_mode_t mode, int flags) ldlm_lock_decref_internal(lock, mode); } - ldlm_lock_destroy(lock); + ldlm_lock_destroy_nolock(lock); EXIT; } @@ -120,7 +124,7 @@ restart: int ldlm_process_flock_lock(struct ldlm_lock *req, int *flags, int first_enq, - ldlm_error_t *err) + ldlm_error_t *err, struct list_head *work_list) { struct ldlm_resource *res = req->l_resource; struct ldlm_namespace *ns = res->lr_namespace; @@ -130,13 +134,13 @@ ldlm_process_flock_lock(struct ldlm_lock *req, int *flags, int first_enq, struct ldlm_lock *new = req; struct ldlm_lock *new2 = NULL; ldlm_mode_t mode = req->l_req_mode; - int local = ns->ns_client; + int local = ns_is_client(ns); int added = (mode == LCK_NL); int overlaps = 0; ENTRY; - CDEBUG(D_DLMTRACE, "flags %#x pid "LPU64" mode %u start "LPU64" end " - LPU64"\n", *flags, new->l_policy_data.l_flock.pid, mode, + CDEBUG(D_DLMTRACE, "flags %#x pid %u mode %u start "LPU64" end "LPU64 + "\n", *flags, new->l_policy_data.l_flock.pid, mode, req->l_policy_data.l_flock.start, req->l_policy_data.l_flock.end); @@ -254,7 +258,8 @@ ldlm_process_flock_lock(struct ldlm_lock *req, int *flags, int first_enq, * overflow and underflow. */ if ((new->l_policy_data.l_flock.start > (lock->l_policy_data.l_flock.end + 1)) - && (lock->l_policy_data.l_flock.end != ~0)) + && (lock->l_policy_data.l_flock.end != + OBD_OBJECT_EOF)) continue; if ((new->l_policy_data.l_flock.end < @@ -329,7 +334,7 @@ ldlm_process_flock_lock(struct ldlm_lock *req, int *flags, int first_enq, /* XXX - if ldlm_lock_new() can sleep we should * release the ns_lock, allocate the new lock, * and restart processing this lock. */ - new2 = ldlm_lock_create(ns, NULL, res->lr_name, LDLM_FLOCK, + new2 = ldlm_lock_create(ns, &res->lr_name, LDLM_FLOCK, lock->l_granted_mode, NULL, NULL, NULL, NULL, 0); if (!new2) { @@ -350,8 +355,10 @@ ldlm_process_flock_lock(struct ldlm_lock *req, int *flags, int first_enq, new2->l_conn_export = lock->l_conn_export; if (lock->l_export != NULL) { new2->l_export = class_export_get(lock->l_export); + spin_lock(&new2->l_export->exp_ldlm_data.led_lock); list_add(&new2->l_export_chain, &new2->l_export->exp_ldlm_data.led_held_locks); + spin_unlock(&new2->l_export->exp_ldlm_data.led_lock); } if (*flags == LDLM_FL_WAIT_NOREPROC) ldlm_lock_addref_internal(new2, lock->l_granted_mode); @@ -375,7 +382,7 @@ ldlm_process_flock_lock(struct ldlm_lock *req, int *flags, int first_enq, if (*flags != LDLM_FL_WAIT_NOREPROC) { if (first_enq) { /* If this is an unlock, reprocess the waitq and - * send completions ASTs for locks that can now be + * send completions ASTs for locks that can now be * granted. The only problem with doing this * reprocessing here is that the completion ASTs for * newly granted locks will be sent before the unlock @@ -385,23 +392,21 @@ ldlm_process_flock_lock(struct ldlm_lock *req, int *flags, int first_enq, * ldlm_reprocess_queue. */ if ((mode == LCK_NL) && overlaps) { struct list_head rpc_list - = LIST_HEAD_INIT(rpc_list); + = CFS_LIST_HEAD_INIT(rpc_list); int rc; restart: - res->lr_tmp = &rpc_list; - ldlm_reprocess_queue(res, &res->lr_waiting); - res->lr_tmp = NULL; - - l_unlock(&ns->ns_lock); - rc = ldlm_run_ast_work(res->lr_namespace, - &rpc_list); - l_lock(&ns->ns_lock); + ldlm_reprocess_queue(res, &res->lr_waiting, + &rpc_list); + + unlock_res(res); + rc = ldlm_run_ast_work(&rpc_list, LDLM_WORK_BL_AST); + lock_res(res); if (rc == -ERESTART) GOTO(restart, -ERESTART); } } else { LASSERT(req->l_completion_ast); - ldlm_add_ast_work_item(req, NULL, NULL, 0); + ldlm_add_ast_work_item(req, NULL, work_list); } } @@ -412,7 +417,7 @@ restart: if (added) ldlm_flock_destroy(req, mode, *flags); - ldlm_resource_dump(res); + ldlm_resource_dump(D_OTHER, res); RETURN(LDLM_ITER_CONTINUE); } @@ -434,9 +439,15 @@ ldlm_flock_interrupted_wait(void *data) /* take lock off the deadlock detection waitq. */ list_del_init(&lock->l_flock_waitq); + /* client side - set flag to prevent lock from being put on lru list */ + lock->l_flags |= LDLM_FL_CBPENDING; + ldlm_lock_decref_internal(lock, lock->l_req_mode); ldlm_lock2handle(lock, &lockh); rc = ldlm_cli_cancel(&lockh); + if (rc != ELDLM_OK) + CERROR("ldlm_cli_cancel: %d\n", rc); + EXIT; } @@ -444,9 +455,8 @@ int ldlm_flock_completion_ast(struct ldlm_lock *lock, int flags, void *data) { struct ldlm_namespace *ns; - struct file_lock *getlk = lock->l_ast_data; + cfs_flock_t *getlk = lock->l_ast_data; struct ldlm_flock_wait_data fwd; - unsigned long irqflags; struct obd_device *obd; struct obd_import *imp = NULL; ldlm_error_t err; @@ -459,11 +469,6 @@ ldlm_flock_completion_ast(struct ldlm_lock *lock, int flags, void *data) LASSERT(flags != LDLM_FL_WAIT_NOREPROC); - if (flags == 0) { - wake_up(&lock->l_waitq); - RETURN(0); - } - if (!(flags & (LDLM_FL_BLOCK_WAIT | LDLM_FL_BLOCK_GRANTED | LDLM_FL_BLOCK_CONV))) goto granted; @@ -471,8 +476,6 @@ ldlm_flock_completion_ast(struct ldlm_lock *lock, int flags, void *data) LDLM_DEBUG(lock, "client-side enqueue returned a blocked lock, " "sleeping"); - ldlm_lock_dump(D_DLMTRACE, lock, 0); - fwd.fwd_lock = lock; obd = class_exp2obd(lock->l_conn_export); @@ -481,9 +484,9 @@ ldlm_flock_completion_ast(struct ldlm_lock *lock, int flags, void *data) imp = obd->u.cli.cl_import; if (imp != NULL) { - spin_lock_irqsave(&imp->imp_lock, irqflags); + spin_lock(&imp->imp_lock); fwd.fwd_generation = imp->imp_generation; - spin_unlock_irqrestore(&imp->imp_lock, irqflags); + spin_unlock(&imp->imp_lock); } lwi = LWI_TIMEOUT_INTR(0, NULL, ldlm_flock_interrupted_wait, &fwd); @@ -493,19 +496,14 @@ ldlm_flock_completion_ast(struct ldlm_lock *lock, int flags, void *data) ((lock->l_req_mode == lock->l_granted_mode) || lock->l_destroyed), &lwi); - if (rc) { - LDLM_DEBUG(lock, "client-side enqueue waking up: failed (%d)", - rc); - RETURN(rc); - } - - LASSERT(!(lock->l_destroyed)); + LDLM_DEBUG(lock, "client-side enqueue waking up: rc = %d", rc); + RETURN(rc); granted: - LDLM_DEBUG(lock, "client-side enqueue waking up"); + LDLM_DEBUG(lock, "client-side enqueue granted"); ns = lock->l_resource->lr_namespace; - l_lock(&ns->ns_lock); + lock_res(lock->l_resource); /* take lock off the deadlock detection waitq. */ list_del_init(&lock->l_flock_waitq); @@ -517,29 +515,33 @@ granted: /* fcntl(F_GETLK) request */ /* The old mode was saved in getlk->fl_type so that if the mode * in the lock changes we can decref the approprate refcount. */ - ldlm_flock_destroy(lock, getlk->fl_type, LDLM_FL_WAIT_NOREPROC); + ldlm_flock_destroy(lock, cfs_flock_type(getlk), LDLM_FL_WAIT_NOREPROC); switch (lock->l_granted_mode) { case LCK_PR: - getlk->fl_type = F_RDLCK; + cfs_flock_set_type(getlk, F_RDLCK); break; case LCK_PW: - getlk->fl_type = F_WRLCK; + cfs_flock_set_type(getlk, F_WRLCK); break; default: - getlk->fl_type = F_UNLCK; + cfs_flock_set_type(getlk, F_UNLCK); } - getlk->fl_pid = lock->l_policy_data.l_flock.pid; - getlk->fl_start = lock->l_policy_data.l_flock.start; - getlk->fl_end = lock->l_policy_data.l_flock.end; + cfs_flock_set_pid(getlk, (pid_t)lock->l_policy_data.l_flock.pid); + cfs_flock_set_start(getlk, (loff_t)lock->l_policy_data.l_flock.start); + cfs_flock_set_end(getlk, (loff_t)lock->l_policy_data.l_flock.end); } else { + int noreproc = LDLM_FL_WAIT_NOREPROC; + /* We need to reprocess the lock to do merges or splits * with existing locks owned by this process. */ - flags = LDLM_FL_WAIT_NOREPROC; - ldlm_process_flock_lock(lock, &flags, 1, &err); + ldlm_process_flock_lock(lock, &noreproc, 1, &err, NULL); + if (flags == 0) + cfs_waitq_signal(&lock->l_waitq); } - l_unlock(&ns->ns_lock); + unlock_res(lock->l_resource); RETURN(0); } +EXPORT_SYMBOL(ldlm_flock_completion_ast); int ldlm_flock_blocking_ast(struct ldlm_lock *lock, struct ldlm_lock_desc *desc, void *data, int flag) @@ -551,10 +553,10 @@ int ldlm_flock_blocking_ast(struct ldlm_lock *lock, struct ldlm_lock_desc *desc, LASSERT(flag == LDLM_CB_CANCELING); ns = lock->l_resource->lr_namespace; - + /* take lock off the deadlock detection waitq. */ - l_lock(&ns->ns_lock); + lock_res_and_lock(lock); list_del_init(&lock->l_flock_waitq); - l_unlock(&ns->ns_lock); + unlock_res_and_lock(lock); RETURN(0); }