From 1dab27bede07c2c856e23318a55dc1a9fbd2c1b9 Mon Sep 17 00:00:00 2001 From: pschwan Date: Sun, 27 Oct 2002 03:03:50 +0000 Subject: [PATCH] most of the work for lock replay: - add a dlm replay flag that: - prevents the calling of policy functions - tells the server to trust the client's judgement - a function to call ldlm_cli_enqueue for replay next up: - something to help replay _all_ locks in a given namespace, probably in the form of some generic ldlm_namespace_foreach thing --- lustre/include/linux/lustre_dlm.h | 5 +- lustre/ldlm/ldlm_lock.c | 20 +++--- lustre/ldlm/ldlm_request.c | 130 +++++++++++++++++++++++++++----------- lustre/llite/file.c | 4 +- lustre/mdc/mdc_request.c | 2 +- lustre/mds/handler.c | 2 +- lustre/mds/mds_reint.c | 10 +-- 7 files changed, 116 insertions(+), 57 deletions(-) diff --git a/lustre/include/linux/lustre_dlm.h b/lustre/include/linux/lustre_dlm.h index c07d6ff..791f425 100644 --- a/lustre/include/linux/lustre_dlm.h +++ b/lustre/include/linux/lustre_dlm.h @@ -39,6 +39,7 @@ typedef enum { #define LDLM_FL_DESTROYED (1 << 6) #define LDLM_FL_WAIT_NOREPROC (1 << 7) #define LDLM_FL_CANCEL (1 << 8) +#define LDLM_FL_REPLAY (1 << 9) #define LDLM_CB_BLOCKING 1 #define LDLM_CB_CANCELING 2 @@ -144,8 +145,6 @@ struct ldlm_lock { struct lustre_handle l_remote_handle; void *l_data; __u32 l_data_len; - void *l_cookie; - int l_cookie_len; struct ldlm_extent l_extent; __u32 l_version[RES_VERSION_SIZE]; @@ -308,7 +307,7 @@ void ldlm_lock_put(struct ldlm_lock *lock); void ldlm_lock_destroy(struct ldlm_lock *lock); void ldlm_lock2desc(struct ldlm_lock *lock, struct ldlm_lock_desc *desc); void ldlm_lock_addref(struct lustre_handle *lockh, __u32 mode); -void ldlm_lock_addref_internal(struct ldlm_lock* , __u32 mode); +void ldlm_lock_addref_internal(struct ldlm_lock *, __u32 mode); void ldlm_lock_decref(struct lustre_handle *lockh, __u32 mode); void ldlm_grant_lock(struct ldlm_lock *lock); int ldlm_lock_match(struct ldlm_namespace *ns, __u64 *res_id, __u32 type, diff --git a/lustre/ldlm/ldlm_lock.c b/lustre/ldlm/ldlm_lock.c index a361cab..7e0e93c 100644 --- a/lustre/ldlm/ldlm_lock.c +++ b/lustre/ldlm/ldlm_lock.c @@ -564,7 +564,7 @@ static struct ldlm_lock *search_queue(struct list_head *queue, ldlm_mode_t mode, continue; /* lock_convert() takes the resource lock, so we're sure that - * req_mode, lr_type, and l_cookie won't change beneath us */ + * req_mode and lr_type won't change beneath us */ if (lock->l_req_mode != mode) continue; @@ -673,9 +673,10 @@ ldlm_error_t ldlm_lock_enqueue(struct ldlm_lock * lock, if (res->lr_type == LDLM_EXTENT) memcpy(&lock->l_extent, cookie, sizeof(lock->l_extent)); - /* policies are not executed on the client */ + /* policies are not executed on the client or during replay */ local = res->lr_namespace->ns_client; - if (!local && (policy = ldlm_res_policy_table[res->lr_type])) { + if (!local && !(*flags & LDLM_FL_REPLAY) && + (policy = ldlm_res_policy_table[res->lr_type])) { int rc; rc = policy(lock, cookie, lock->l_req_mode, NULL); @@ -688,9 +689,6 @@ ldlm_error_t ldlm_lock_enqueue(struct ldlm_lock * lock, } } - lock->l_cookie = cookie; - lock->l_cookie_len = cookie_len; - l_lock(&res->lr_namespace->ns_lock); if (local && lock->l_req_mode == lock->l_granted_mode) { /* The server returned a blocked lock, but it was granted before @@ -705,9 +703,15 @@ ldlm_error_t ldlm_lock_enqueue(struct ldlm_lock * lock, * namespace only has information about locks taken by that client, and * thus doesn't have enough information to decide for itself if it can * be granted (below). In this case, we do exactly what the server - * tells us to do, as dictated by the 'flags' */ + * tells us to do, as dictated by the 'flags'. + * + * We do exactly the same thing during recovery, when the server is + * more or less trusting the clients not to lie. + * + * FIXME (bug 629283): Detect obvious lies by checking compatibility in + * granted/converting queues. */ ldlm_resource_unlink_lock(lock); - if (local) { + if (local || (*flags & LDLM_FL_REPLAY)) { if (*flags & LDLM_FL_BLOCK_CONV) ldlm_resource_add_lock(res, res->lr_converting.prev, lock); diff --git a/lustre/ldlm/ldlm_request.c b/lustre/ldlm/ldlm_request.c index 4d16f31..4117b67 100644 --- a/lustre/ldlm/ldlm_request.c +++ b/lustre/ldlm/ldlm_request.c @@ -169,24 +169,37 @@ int ldlm_cli_enqueue(struct lustre_handle *connh, struct ldlm_lock *lock; struct ldlm_request *body; struct ldlm_reply *reply; - int rc, size = sizeof(*body), req_passed_in = 1; + int rc, size = sizeof(*body), req_passed_in = 1, is_replay; ENTRY; + is_replay = *flags & LDLM_FL_REPLAY; + LASSERT(connh != NULL || !is_replay); + if (connh == NULL) return ldlm_cli_enqueue_local(ns, parent_lock_handle, res_id, type, cookie, cookielen, mode, flags, completion, blocking, data, data_len, lockh); - *flags = 0; - lock = ldlm_lock_create(ns, parent_lock_handle, res_id, type, mode, - data, data_len); - if (lock == NULL) - GOTO(out_nolock, rc = -ENOMEM); - LDLM_DEBUG(lock, "client-side enqueue START"); - /* for the local lock, add the reference */ - ldlm_lock_addref_internal(lock, mode); - ldlm_lock2handle(lock, lockh); + /* If we're replaying this lock, just check some invariants. + * If we're creating a new lock, get everything all setup nice. */ + if (is_replay) { + lock = ldlm_handle2lock(lockh); + LDLM_DEBUG(lock, "client-side enqueue START"); + LASSERT(connh == lock->l_connh); + } else { + lock = ldlm_lock_create(ns, parent_lock_handle, res_id, type, + mode, data, data_len); + if (lock == NULL) + GOTO(out_nolock, rc = -ENOMEM); + LDLM_DEBUG(lock, "client-side enqueue START"); + /* for the local lock, add the reference */ + ldlm_lock_addref_internal(lock, mode); + ldlm_lock2handle(lock, lockh); + if (type == LDLM_EXTENT) + memcpy(&lock->l_extent, cookie, + sizeof(body->lock_desc.l_extent)); + } if (req == NULL) { req = ptlrpc_prep_req(class_conn2cliimp(connh), LDLM_ENQUEUE, 1, @@ -197,17 +210,9 @@ int ldlm_cli_enqueue(struct lustre_handle *connh, } else if (req->rq_reqmsg->buflens[0] != sizeof(*body)) LBUG(); - /* Dump all of this data into the request buffer */ + /* Dump lock data into the request buffer */ body = lustre_msg_buf(req->rq_reqmsg, 0); ldlm_lock2desc(lock, &body->lock_desc); - /* Phil: make this part of ldlm_lock2desc */ - if (type == LDLM_EXTENT) { - memcpy(&body->lock_desc.l_extent, cookie, - sizeof(body->lock_desc.l_extent)); - CDEBUG(D_INFO, "extent in body: "LPU64" -> "LPU64"\n", - body->lock_desc.l_extent.start, - body->lock_desc.l_extent.end); - } body->lock_flags = *flags; memcpy(&body->lock_handle1, lockh, sizeof(*lockh)); @@ -223,11 +228,12 @@ int ldlm_cli_enqueue(struct lustre_handle *connh, lock->l_connh = connh; lock->l_export = NULL; + LDLM_DEBUG(lock, "sending request"); rc = ptlrpc_queue_wait(req); - /* FIXME: status check here? */ rc = ptlrpc_check_status(req, rc); if (rc != ELDLM_OK) { + LASSERT(!is_replay); LDLM_DEBUG(lock, "client-side enqueue END (%s)", rc == ELDLM_LOCK_ABORTED ? "ABORTED" : "FAILED"); ldlm_lock_decref(lockh, mode); @@ -240,22 +246,26 @@ int ldlm_cli_enqueue(struct lustre_handle *connh, reply = lustre_msg_buf(req->rq_repmsg, 0); memcpy(&lock->l_remote_handle, &reply->lock_handle, sizeof(lock->l_remote_handle)); - if (type == LDLM_EXTENT) - memcpy(cookie, &reply->lock_extent, sizeof(reply->lock_extent)); *flags = reply->lock_flags; CDEBUG(D_INFO, "remote handle: %p, flags: %d\n", (void *)(unsigned long)reply->lock_handle.addr, *flags); - CDEBUG(D_INFO, "requested extent: "LPU64" -> "LPU64", got extent " - LPU64" -> "LPU64"\n", - body->lock_desc.l_extent.start, body->lock_desc.l_extent.end, - reply->lock_extent.start, reply->lock_extent.end); + if (type == LDLM_EXTENT) { + CDEBUG(D_INFO, "requested extent: "LPU64" -> "LPU64", got " + "extent "LPU64" -> "LPU64"\n", + body->lock_desc.l_extent.start, + body->lock_desc.l_extent.end, + reply->lock_extent.start, reply->lock_extent.end); + cookie = &reply->lock_extent; /* FIXME bug 629281 */ + cookielen = sizeof(reply->lock_extent); + } /* If enqueue returned a blocked lock but the completion handler has * already run, then it fixed up the resource and we don't need to do it * again. */ if ((*flags) & LDLM_FL_LOCK_CHANGED) { int newmode = reply->lock_mode; + LASSERT(!is_replay); if (newmode && newmode != lock->l_req_mode) { LDLM_DEBUG(lock, "server returned different mode %s", ldlm_lockname[newmode]); @@ -282,10 +292,12 @@ int ldlm_cli_enqueue(struct lustre_handle *connh, if (!req_passed_in) ptlrpc_req_finished(req); - rc = ldlm_lock_enqueue(lock, cookie, cookielen, flags, completion, - blocking); - if (lock->l_completion_ast) - lock->l_completion_ast(lock, *flags); + if (!is_replay) { + rc = ldlm_lock_enqueue(lock, cookie, cookielen, flags, + completion, blocking); + if (lock->l_completion_ast) + lock->l_completion_ast(lock, *flags); + } LDLM_DEBUG(lock, "client-side enqueue END"); EXIT; @@ -325,10 +337,20 @@ int ldlm_match_or_enqueue(struct lustre_handle *connh, RETURN(0); } +int ldlm_cli_replay_enqueue(struct ldlm_lock *lock) +{ + struct lustre_handle lockh; + int flags = LDLM_FL_REPLAY; + ldlm_lock2handle(lock, &lockh); + return ldlm_cli_enqueue(lock->l_connh, NULL, NULL, NULL, NULL, + lock->l_resource->lr_type, NULL, 0, -1, &flags, + NULL, NULL, NULL, 0, &lockh); +} + static int ldlm_cli_convert_local(struct ldlm_lock *lock, int new_mode, int *flags) { - + ENTRY; if (lock->l_resource->lr_namespace->ns_client) { CERROR("Trying to cancel local lock\n"); LBUG(); @@ -365,7 +387,7 @@ int ldlm_cli_convert(struct lustre_handle *lockh, int new_mode, int *flags) connh = lock->l_connh; if (!connh) - return ldlm_cli_convert_local(lock, new_mode, flags); + RETURN(ldlm_cli_convert_local(lock, new_mode, flags)); LDLM_DEBUG(lock, "client-side convert"); @@ -470,12 +492,8 @@ int ldlm_cli_cancel(struct lustre_handle *lockh) return rc; } -/* Cancel all locks on a given resource that have 0 readers/writers. - * - * If 'local_only' is true, throw the locks away without trying to notify the - * server. */ -int ldlm_cli_cancel_unused(struct ldlm_namespace *ns, __u64 *res_id, - int local_only) +static int ldlm_cli_cancel_unused_resource(struct ldlm_namespace *ns, + __u64 *res_id, int local_only) { struct ldlm_resource *res; struct list_head *tmp, *next, list = LIST_HEAD_INIT(list); @@ -534,3 +552,39 @@ int ldlm_cli_cancel_unused(struct ldlm_namespace *ns, __u64 *res_id, RETURN(0); } + +/* Cancel all locks on a namespace (or a specific resource, if given) that have + * 0 readers/writers. + * + * If 'local_only' is true, throw the locks away without trying to notify the + * server. */ +int ldlm_cli_cancel_unused(struct ldlm_namespace *ns, __u64 *res_id, + int local_only) +{ + int i; + + if (res_id) + RETURN(ldlm_cli_cancel_unused_resource(ns, res_id, local_only)); + + l_lock(&ns->ns_lock); + for (i = 0; i < RES_HASH_SIZE; i++) { + struct list_head *tmp, *pos; + list_for_each_safe(tmp, pos, &(ns->ns_hash[i])) { + int rc; + struct ldlm_resource *res; + res = list_entry(tmp, struct ldlm_resource, lr_hash); + ldlm_resource_getref(res); + + rc = ldlm_cli_cancel_unused_resource(ns, res->lr_name, + local_only); + + if (rc) + CERROR("cancel_unused_res ("LPU64"): %d\n", + res->lr_name[0], rc); + ldlm_resource_put(res); + } + } + l_unlock(&ns->ns_lock); + + return ELDLM_OK; +} diff --git a/lustre/llite/file.c b/lustre/llite/file.c index 9429b79..7778e01 100644 --- a/lustre/llite/file.c +++ b/lustre/llite/file.c @@ -415,7 +415,6 @@ static ssize_t ll_file_read(struct file *filp, char *buf, size_t count, struct ll_file_data *fd = (struct ll_file_data *)filp->private_data; struct inode *inode = filp->f_dentry->d_inode; struct ll_sb_info *sbi = ll_i2sbi(inode); - struct ldlm_extent extent; struct lustre_handle *lockhs = NULL; struct lov_stripe_md *lsm = ll_i2info(inode)->lli_smd; int flags = 0; @@ -425,6 +424,7 @@ static ssize_t ll_file_read(struct file *filp, char *buf, size_t count, if (!(fd->fd_flags & LL_FILE_IGNORE_LOCK) && !(sbi->ll_flags & LL_SBI_NOLCK)) { + struct ldlm_extent extent; OBD_ALLOC(lockhs, lsm->lsm_stripe_count * sizeof(*lockhs)); if (!lockhs) RETURN(-ENOMEM); @@ -475,7 +475,6 @@ ll_file_write(struct file *file, const char *buf, size_t count, loff_t *ppos) struct ll_file_data *fd = (struct ll_file_data *)file->private_data; struct inode *inode = file->f_dentry->d_inode; struct ll_sb_info *sbi = ll_i2sbi(inode); - struct ldlm_extent extent; struct lustre_handle *lockhs = NULL, *eof_lockhs = NULL; struct lov_stripe_md *lsm = ll_i2info(inode)->lli_smd; int flags = 0; @@ -514,6 +513,7 @@ ll_file_write(struct file *file, const char *buf, size_t count, loff_t *ppos) if (!(fd->fd_flags & LL_FILE_IGNORE_LOCK) && !(sbi->ll_flags & LL_SBI_NOLCK)) { + struct ldlm_extent extent; OBD_ALLOC(lockhs, lsm->lsm_stripe_count * sizeof(*lockhs)); if (!lockhs) GOTO(out_eof, retval = -ENOMEM); diff --git a/lustre/mdc/mdc_request.c b/lustre/mdc/mdc_request.c index 1f9c07b..17787276 100644 --- a/lustre/mdc/mdc_request.c +++ b/lustre/mdc/mdc_request.c @@ -205,7 +205,7 @@ int mdc_enqueue(struct lustre_handle *conn, int lock_type, struct obd_device *obddev = class_conn2obd(conn); __u64 res_id[RES_NAME_SIZE] = {dir->i_ino}; int size[5] = {sizeof(struct ldlm_request), sizeof(struct ldlm_intent)}; - int rc, flags; + int rc, flags = 0; int repsize[3] = {sizeof(struct ldlm_reply), sizeof(struct mds_body), obddev->u.cli.cl_max_mds_easize}; diff --git a/lustre/mds/handler.c b/lustre/mds/handler.c index 0419a75..4629542 100644 --- a/lustre/mds/handler.c +++ b/lustre/mds/handler.c @@ -557,7 +557,7 @@ static int mds_getattr_name(int offset, struct ptlrpc_request *req) struct inode *dir; struct lustre_handle lockh; char *name; - int namelen, flags, lock_mode, rc = 0; + int namelen, flags = 0, lock_mode, rc = 0; struct obd_ucred uc; __u64 res_id[3] = {0, 0, 0}; ENTRY; diff --git a/lustre/mds/mds_reint.c b/lustre/mds/mds_reint.c index 60ea97a..45cd424 100644 --- a/lustre/mds/mds_reint.c +++ b/lustre/mds/mds_reint.c @@ -127,7 +127,7 @@ static int mds_reint_setattr(struct mds_update_record *rec, int offset, inode = de->d_inode; CDEBUG(D_INODE, "ino %ld\n", inode->i_ino); - OBD_FAIL_WRITE(OBD_FAIL_MDS_REINT_SETATTR_WRITE, + OBD_FAIL_WRITE(OBD_FAIL_MDS_REINT_SETATTR_WRITE, to_kdev_t(inode->i_sb->s_dev)); handle = mds_fs_start(mds, inode, MDS_FSOP_SETATTR); @@ -413,7 +413,7 @@ static int mds_reint_unlink(struct mds_update_record *rec, int offset, "child missing (%ld/%s); OK for REPLAYING\n", dir->i_ino, rec->ur_name); rc = 0; - } else { + } else { CDEBUG(D_INODE, "child doesn't exist (dir %ld, name %s)\n", dir->i_ino, rec->ur_name); @@ -432,7 +432,7 @@ static int mds_reint_unlink(struct mds_update_record *rec, int offset, mds_pack_inode2body(body, inode); } - OBD_FAIL_WRITE(OBD_FAIL_MDS_REINT_UNLINK_WRITE, + OBD_FAIL_WRITE(OBD_FAIL_MDS_REINT_UNLINK_WRITE, to_kdev_t(dir->i_sb->s_dev)); switch (rec->ur_mode /* & S_IFMT ? */) { @@ -655,7 +655,7 @@ static int mds_reint_rename(struct mds_update_record *rec, int offset, struct dentry *de_new = NULL; struct mds_obd *mds = mds_req2mds(req); struct lustre_handle tgtlockh, srclockh, oldhandle; - int flags, lock_mode, rc = 0, err; + int flags = 0, lock_mode, rc = 0, err; void *handle; __u64 res_id[3] = { 0 }; ENTRY; @@ -692,6 +692,7 @@ static int mds_reint_rename(struct mds_update_record *rec, int offset, rc = ldlm_lock_match(obd->obd_namespace, res_id, LDLM_PLAIN, NULL, 0, lock_mode, &tgtlockh); if (rc == 0) { + flags = 0; LDLM_DEBUG_NOLOCK("enqueue res "LPU64, res_id[0]); rc = ldlm_cli_enqueue(NULL, NULL, obd->obd_namespace, NULL, res_id, LDLM_PLAIN, NULL, 0, lock_mode, @@ -773,6 +774,7 @@ out_rename_denew: out_rename_deold: if (!rc) { res_id[0] = de_old->d_inode->i_ino; + flags = 0; /* Take an exclusive lock on the resource that we're * about to free, to force everyone to drop their * locks. */ -- 1.8.3.1