From: Mikhail Pershin Date: Wed, 23 May 2012 19:49:51 +0000 (+0400) Subject: LU-1406 ofd: add DLM and LVB code X-Git-Tag: 2.2.57~13 X-Git-Url: https://git.whamcloud.com/?p=fs%2Flustre-release.git;a=commitdiff_plain;h=6c19f5437f2b089b8f27c3855408d098b01e63d9 LU-1406 ofd: add DLM and LVB code OFD dlm and lvb functions Signed-off-by: Mikhail Pershin Change-Id: Ieddd2b16129f62c6e380d007aaa1f2de70e8a9ca Reviewed-on: http://review.whamcloud.com/2892 Tested-by: Hudson Reviewed-by: Andreas Dilger Tested-by: Maloo Reviewed-by: Alex Zhuravlev --- diff --git a/lustre/ofd/Makefile.in b/lustre/ofd/Makefile.in index aa73c81..15f0c46 100644 --- a/lustre/ofd/Makefile.in +++ b/lustre/ofd/Makefile.in @@ -1,7 +1,7 @@ MODULES := ofd ofd-objs := ofd_dev.o ofd_obd.o ofd_fs.o ofd_trans.o ofd_objects.o -ofd-objs += lproc_ofd.o ofd_capa.o ofd_fmd.o ofd_grant.o +ofd-objs += lproc_ofd.o ofd_capa.o ofd_fmd.o ofd_grant.o ofd_dlm.o ofd_lvb.o EXTRA_DIST = $(ofd-objs:%.o=%.c) ofd_internal.h diff --git a/lustre/ofd/ofd_dev.c b/lustre/ofd/ofd_dev.c index 4c816df..9297ec3 100644 --- a/lustre/ofd/ofd_dev.c +++ b/lustre/ofd/ofd_dev.c @@ -517,6 +517,12 @@ static int ofd_init0(const struct lu_env *env, struct ofd_device *m, GOTO(err_fini_stack, rc = -ENOMEM); /* set obd_namespace for compatibility with old code */ obd->obd_namespace = m->ofd_namespace; + ldlm_register_intent(m->ofd_namespace, ofd_intent_policy); + m->ofd_namespace->ns_lvbo = &ofd_lvbo; + m->ofd_namespace->ns_lvbp = m; + + ptlrpc_init_client(LDLM_CB_REQUEST_PORTAL, LDLM_CB_REPLY_PORTAL, + "filter_ldlm_cb_client", &obd->obd_ldlm_client); dt_conf_get(env, m->ofd_osd, &m->ofd_dt_conf); diff --git a/lustre/ofd/ofd_dlm.c b/lustre/ofd/ofd_dlm.c new file mode 100644 index 0000000..34790c5 --- /dev/null +++ b/lustre/ofd/ofd_dlm.c @@ -0,0 +1,241 @@ +/* + * GPL HEADER START + * + * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER. + * + * This program is free software; you can redistribute it and/or modify + * it under the terms of the GNU General Public License version 2 only, + * as published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, but + * WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU + * General Public License version 2 for more details (a copy is included + * in the LICENSE file that accompanied this code). + * + * You should have received a copy of the GNU General Public License + * version 2 along with this program; If not, see + * http://www.sun.com/software/products/lustre/docs/GPLv2.pdf + * + * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara, + * CA 95054 USA or visit www.sun.com if you need additional information or + * have any questions. + * + * GPL HEADER END + */ +/* + * Copyright (c) 2009, 2010, Oracle and/or its affiliates. All rights reserved. + * Use is subject to license terms. + * + * Copyright (c) 2011, 2012, Whamcloud, Inc. + */ +/* + * This file is part of Lustre, http://www.lustre.org/ + * Lustre is a trademark of Sun Microsystems, Inc. + * + * lustre/ofd/ofd_dlm.c + * + * Author: Mike Pershin + * Author: Alex Zhuravlev + */ + +#define DEBUG_SUBSYSTEM S_FILTER + +#include "ofd_internal.h" + +struct ofd_intent_args { + struct ldlm_lock **victim; + __u64 size; + int *liblustre; +}; + +static enum interval_iter ofd_intent_cb(struct interval_node *n, void *args) +{ + struct ldlm_interval *node = (struct ldlm_interval *)n; + struct ofd_intent_args *arg = args; + __u64 size = arg->size; + struct ldlm_lock **v = arg->victim; + struct ldlm_lock *lck; + + /* If the interval is lower than the current file size, just break. */ + if (interval_high(n) <= size) + return INTERVAL_ITER_STOP; + + cfs_list_for_each_entry(lck, &node->li_group, l_sl_policy) { + /* Don't send glimpse ASTs to liblustre clients. + * They aren't listening for them, and they do + * entirely synchronous I/O anyways. */ + if (lck->l_export == NULL || lck->l_export->exp_libclient) + continue; + + if (*arg->liblustre) + *arg->liblustre = 0; + + if (*v == NULL) { + *v = LDLM_LOCK_GET(lck); + } else if ((*v)->l_policy_data.l_extent.start < + lck->l_policy_data.l_extent.start) { + LDLM_LOCK_RELEASE(*v); + *v = LDLM_LOCK_GET(lck); + } + + /* the same policy group - every lock has the + * same extent, so needn't do it any more */ + break; + } + + return INTERVAL_ITER_CONT; +} + +int ofd_intent_policy(struct ldlm_namespace *ns, struct ldlm_lock **lockp, + void *req_cookie, ldlm_mode_t mode, int flags, + void *data) +{ + struct ptlrpc_request *req = req_cookie; + struct ldlm_lock *lock = *lockp, *l = NULL; + struct ldlm_resource *res = lock->l_resource; + ldlm_processing_policy policy; + struct ost_lvb *res_lvb, *reply_lvb; + struct ldlm_reply *rep; + ldlm_error_t err; + int idx, rc; + int tmpflags = 0, only_liblustre = 1; + struct ldlm_interval_tree *tree; + struct ofd_intent_args arg; + __u32 repsize[3] = { + [MSG_PTLRPC_BODY_OFF] = sizeof(struct ptlrpc_body), + [DLM_LOCKREPLY_OFF] = sizeof(*rep), + [DLM_REPLY_REC_OFF] = sizeof(*reply_lvb) + }; + + ENTRY; + + policy = ldlm_get_processing_policy(res); + LASSERT(policy != NULL); + LASSERT(req != NULL); + + rc = lustre_pack_reply(req, 3, repsize, NULL); + if (rc) + RETURN(req->rq_status = rc); + + rep = lustre_msg_buf(req->rq_repmsg, DLM_LOCKREPLY_OFF, sizeof(*rep)); + LASSERT(rep != NULL); + + reply_lvb = lustre_msg_buf(req->rq_repmsg, DLM_REPLY_REC_OFF, + sizeof(*reply_lvb)); + LASSERT(reply_lvb != NULL); + + /* Call the extent policy function to see if our request can be + * granted, or is blocked. + * If the OST lock has LDLM_FL_HAS_INTENT set, it means a glimpse + * lock, and should not be granted if the lock will be blocked. + */ + + if (flags & LDLM_FL_BLOCK_NOWAIT) { + OBD_FAIL_TIMEOUT(OBD_FAIL_LDLM_AGL_DELAY, 5); + + if (OBD_FAIL_CHECK(OBD_FAIL_LDLM_AGL_NOLOCK)) + RETURN(ELDLM_LOCK_ABORTED); + } + + LASSERT(ns == ldlm_res_to_ns(res)); + lock_res(res); + rc = policy(lock, &tmpflags, 0, &err, NULL); + check_res_locked(res); + + /* The lock met with no resistance; we're finished. */ + if (rc == LDLM_ITER_CONTINUE) { + /* do not grant locks to the liblustre clients: they cannot + * handle ASTs robustly. We need to do this while still + * holding ns_lock to avoid the lock remaining on the res_link + * list (and potentially being added to l_pending_list by an + * AST) when we are going to drop this lock ASAP. */ + if (lock->l_export->exp_libclient || + OBD_FAIL_TIMEOUT(OBD_FAIL_LDLM_GLIMPSE, 2)) { + ldlm_resource_unlink_lock(lock); + err = ELDLM_LOCK_ABORTED; + } else { + err = ELDLM_LOCK_REPLACED; + } + unlock_res(res); + RETURN(err); + } else if (flags & LDLM_FL_BLOCK_NOWAIT) { + /* LDLM_FL_BLOCK_NOWAIT means it is for AGL. Do not send glimpse + * callback for glimpse size. The real size user will trigger + * the glimpse callback when necessary. */ + unlock_res(res); + RETURN(ELDLM_LOCK_ABORTED); + } + + /* Do not grant any lock, but instead send GL callbacks. The extent + * policy nicely created a list of all PW locks for us. We will choose + * the highest of those which are larger than the size in the LVB, if + * any, and perform a glimpse callback. */ + res_lvb = res->lr_lvb_data; + LASSERT(res_lvb != NULL); + *reply_lvb = *res_lvb; + + /* + * ->ns_lock guarantees that no new locks are granted, and, + * therefore, that res->lr_lvb_data cannot increase beyond the + * end of already granted lock. As a result, it is safe to + * check against "stale" reply_lvb->lvb_size value without + * res->lr_lvb_sem. + */ + arg.size = reply_lvb->lvb_size; + arg.victim = &l; + arg.liblustre = &only_liblustre; + + for (idx = 0; idx < LCK_MODE_NUM; idx++) { + tree = &res->lr_itree[idx]; + if (tree->lit_mode == LCK_PR) + continue; + + interval_iterate_reverse(tree->lit_root, ofd_intent_cb, &arg); + } + unlock_res(res); + + /* There were no PW locks beyond the size in the LVB; finished. */ + if (l == NULL) { + if (only_liblustre) { + /* If we discovered a liblustre client with a PW lock, + * however, the LVB may be out of date! The LVB is + * updated only on glimpse (which we don't do for + * liblustre clients) and cancel (which the client + * obviously has not yet done). So if it has written + * data but kept the lock, the LVB is stale and needs + * to be updated from disk. + * + * Of course, this will all disappear when we switch to + * taking liblustre locks on the OST. */ + ldlm_res_lvbo_update(res, NULL, 1); + } + RETURN(ELDLM_LOCK_ABORTED); + } + + /* + * This check is for lock taken in ofd_prepare_destroy() that does + * not have l_glimpse_ast set. So the logic is: if there is a lock + * with no l_glimpse_ast set, this object is being destroyed already. + * Hence, if you are grabbing DLM locks on the server, always set + * non-NULL glimpse_ast (e.g., ldlm_request.c:ldlm_glimpse_ast()). + */ + if (l->l_glimpse_ast == NULL) { + /* We are racing with unlink(); just return -ENOENT */ + rep->lock_policy_res1 = -ENOENT; + goto out; + } + + LASSERTF(l->l_glimpse_ast != NULL, "l == %p", l); + rc = l->l_glimpse_ast(l, NULL); /* this will update the LVB */ + + lock_res(res); + *reply_lvb = *res_lvb; + unlock_res(res); + +out: + LDLM_LOCK_RELEASE(l); + + RETURN(ELDLM_LOCK_ABORTED); +} + diff --git a/lustre/ofd/ofd_internal.h b/lustre/ofd/ofd_internal.h index c41f992..f3a1a2c 100644 --- a/lustre/ofd/ofd_internal.h +++ b/lustre/ofd/ofd_internal.h @@ -407,6 +407,14 @@ void ofd_fmd_drop(struct obd_export *exp, struct lu_fid *fid); #define ofd_fmd_drop(exp, fid) do {} while (0) #endif +/* ofd_lvb.c */ +extern struct ldlm_valblock_ops ofd_lvbo; + +/* ofd_dlm.c */ +int ofd_intent_policy(struct ldlm_namespace *ns, struct ldlm_lock **lockp, + void *req_cookie, ldlm_mode_t mode, int flags, + void *data); + static inline struct ofd_thread_info * ofd_info(const struct lu_env *env) { struct ofd_thread_info *info; @@ -437,6 +445,39 @@ static inline struct ofd_thread_info * ofd_info_init(const struct lu_env *env, return info; } +/* The same as osc_build_res_name() */ +static inline void ofd_build_resid(const struct lu_fid *fid, + struct ldlm_res_id *resname) +{ + if (fid_is_idif(fid)) { + /* get id/seq like ostid_idif_pack() does */ + osc_build_res_name(fid_idif_id(fid_seq(fid), fid_oid(fid), + fid_ver(fid)), + FID_SEQ_OST_MDT0, resname); + } else { + /* In the future, where OSTs have FID sequences allocated. */ + fid_build_reg_res_name(fid, resname); + } +} + +static inline void ofd_fid_from_resid(struct lu_fid *fid, + const struct ldlm_res_id *name) +{ + /* if seq is FID_SEQ_OST_MDT0 then we have IDIF and resid was built + * using osc_build_res_name function. */ + if (fid_seq_is_mdt0(name->name[LUSTRE_RES_ID_VER_OID_OFF])) { + struct ost_id ostid; + + ostid.oi_id = name->name[LUSTRE_RES_ID_SEQ_OFF]; + ostid.oi_seq = name->name[LUSTRE_RES_ID_VER_OID_OFF]; + fid_ostid_unpack(fid, &ostid, 0); + } else { + fid->f_seq = name->name[LUSTRE_RES_ID_SEQ_OFF]; + fid->f_oid = (__u32)name->name[LUSTRE_RES_ID_VER_OID_OFF]; + fid->f_ver = name->name[LUSTRE_RES_ID_VER_OID_OFF] >> 32; + } +} + /* sync on lock cancel is useless when we force a journal flush, * and if we enable async journal commit, we should also turn on * sync on lock cancel if it is not enabled already. */ diff --git a/lustre/ofd/ofd_lvb.c b/lustre/ofd/ofd_lvb.c new file mode 100644 index 0000000..208198c --- /dev/null +++ b/lustre/ofd/ofd_lvb.c @@ -0,0 +1,250 @@ +/* + * GPL HEADER START + * + * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER. + * + * This program is free software; you can redistribute it and/or modify + * it under the terms of the GNU General Public License version 2 only, + * as published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, but + * WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU + * General Public License version 2 for more details (a copy is included + * in the LICENSE file that accompanied this code). + * + * You should have received a copy of the GNU General Public License + * version 2 along with this program; If not, see + * http://www.sun.com/software/products/lustre/docs/GPLv2.pdf + * + * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara, + * CA 95054 USA or visit www.sun.com if you need additional information or + * have any questions. + * + * GPL HEADER END + */ +/* + * Copyright (c) 2009, 2010, Oracle and/or its affiliates. All rights reserved. + * Use is subject to license terms. + * + * Copyright (c) 2011, 2012, Whamcloud, Inc. + */ +/* + * This file is part of Lustre, http://www.lustre.org/ + * Lustre is a trademark of Sun Microsystems, Inc. + * + * lustre/ofd/ofd_lvb.c + * + * Author: Mikhail Pershin + * Author: Alexey Zhuravlev + */ + +#define DEBUG_SUBSYSTEM S_FILTER + +#include "ofd_internal.h" + +static int ofd_lvbo_free(struct ldlm_resource *res) +{ + if (res->lr_lvb_data) + OBD_FREE(res->lr_lvb_data, res->lr_lvb_len); + + return 0; +} + +/* Called with res->lr_lvb_sem held */ +static int ofd_lvbo_init(struct ldlm_resource *res) +{ + struct ost_lvb *lvb = NULL; + struct ofd_device *ofd; + struct ofd_object *fo; + struct ofd_thread_info *info; + struct lu_env env; + int rc = 0; + + ENTRY; + + LASSERT(res); + LASSERT_MUTEX_LOCKED(&res->lr_lvb_mutex); + + if (res->lr_lvb_data != NULL) + RETURN(0); + + ofd = ldlm_res_to_ns(res)->ns_lvbp; + LASSERT(ofd != NULL); + + rc = lu_env_init(&env, LCT_DT_THREAD); + if (rc) + RETURN(rc); + + OBD_ALLOC_PTR(lvb); + if (lvb == NULL) + RETURN(-ENOMEM); + + res->lr_lvb_data = lvb; + res->lr_lvb_len = sizeof(*lvb); + + info = ofd_info_init(&env, NULL); + ofd_fid_from_resid(&info->fti_fid, &res->lr_name); + fo = ofd_object_find(&env, ofd, &info->fti_fid); + if (IS_ERR(fo)) + GOTO(out, rc = PTR_ERR(fo)); + + rc = ofd_attr_get(&env, fo, &info->fti_attr); + if (rc) + GOTO(out_put, rc); + + lvb->lvb_size = info->fti_attr.la_size; + lvb->lvb_blocks = info->fti_attr.la_blocks; + lvb->lvb_mtime = info->fti_attr.la_mtime; + lvb->lvb_atime = info->fti_attr.la_atime; + lvb->lvb_ctime = info->fti_attr.la_ctime; + + CDEBUG(D_DLMTRACE, "res: "LPX64" initial lvb size: "LPX64", " + "mtime: "LPX64", blocks: "LPX64"\n", + res->lr_name.name[0], lvb->lvb_size, + lvb->lvb_mtime, lvb->lvb_blocks); + + EXIT; +out_put: + ofd_object_put(&env, fo); +out: + lu_env_fini(&env); + if (rc) + OST_LVB_SET_ERR(lvb->lvb_blocks, rc); + /* Don't free lvb data on lookup error */ + return rc; +} + +/* This will be called in two ways: + * + * r != NULL : called by the DLM itself after a glimpse callback + * r == NULL : called by the ofd after a disk write + * + * If 'increase_only' is true, don't allow values to move backwards. + */ +static int ofd_lvbo_update(struct ldlm_resource *res, + struct ptlrpc_request *req, int increase_only) +{ + struct ofd_device *ofd; + struct ofd_object *fo; + struct ofd_thread_info *info; + struct ost_lvb *lvb; + struct lu_env env; + int rc = 0; + + ENTRY; + + LASSERT(res != NULL); + + ofd = ldlm_res_to_ns(res)->ns_lvbp; + LASSERT(ofd != NULL); + + lvb = res->lr_lvb_data; + if (lvb == NULL) { + CERROR("%s: no lvb when running lvbo_update, res: "LPU64"!\n", + ofd_obd(ofd)->obd_name, res->lr_name.name[0]); + RETURN(0); + } + + rc = lu_env_init(&env, LCT_DT_THREAD); + if (rc) + GOTO(out_unlock, rc); + + info = ofd_info_init(&env, NULL); + /* Update the LVB from the network message */ + if (req != NULL) { + struct ost_lvb *rpc_lvb; + + /* XXX update always from reply buffer */ + rpc_lvb = req_capsule_server_get(&req->rq_pill, &RMF_DLM_LVB); + if (rpc_lvb == NULL) { + CERROR("lustre_swab_buf failed\n"); + goto disk_update; + } + lock_res(res); + if (rpc_lvb->lvb_size > lvb->lvb_size || !increase_only) { + CDEBUG(D_DLMTRACE, "res: "LPU64" updating lvb size: " + LPU64" -> "LPU64"\n", res->lr_name.name[0], + lvb->lvb_size, rpc_lvb->lvb_size); + lvb->lvb_size = rpc_lvb->lvb_size; + } + if (rpc_lvb->lvb_mtime > lvb->lvb_mtime || !increase_only) { + CDEBUG(D_DLMTRACE, "res: "LPU64" updating lvb mtime: " + LPU64" -> "LPU64"\n", res->lr_name.name[0], + lvb->lvb_mtime, rpc_lvb->lvb_mtime); + lvb->lvb_mtime = rpc_lvb->lvb_mtime; + } + if (rpc_lvb->lvb_atime > lvb->lvb_atime || !increase_only) { + CDEBUG(D_DLMTRACE, "res: "LPU64" updating lvb atime: " + LPU64" -> "LPU64"\n", res->lr_name.name[0], + lvb->lvb_atime, rpc_lvb->lvb_atime); + lvb->lvb_atime = rpc_lvb->lvb_atime; + } + if (rpc_lvb->lvb_ctime > lvb->lvb_ctime || !increase_only) { + CDEBUG(D_DLMTRACE, "res: "LPU64" updating lvb ctime: " + LPU64" -> "LPU64"\n", res->lr_name.name[0], + lvb->lvb_ctime, rpc_lvb->lvb_ctime); + lvb->lvb_ctime = rpc_lvb->lvb_ctime; + } + unlock_res(res); + } + +disk_update: + /* Update the LVB from the disk inode */ + ofd_fid_from_resid(&info->fti_fid, &res->lr_name); + fo = ofd_object_find(&env, ofd, &info->fti_fid); + if (IS_ERR(fo)) + GOTO(out_env, rc = PTR_ERR(fo)); + + rc = ofd_attr_get(&env, fo, &info->fti_attr); + if (rc) + GOTO(out_obj, rc); + + lock_res(res); + if (info->fti_attr.la_size > lvb->lvb_size || !increase_only) { + CDEBUG(D_DLMTRACE, "res: "LPU64" updating lvb size from disk: " + LPU64" -> %llu\n", res->lr_name.name[0], + lvb->lvb_size, info->fti_attr.la_size); + lvb->lvb_size = info->fti_attr.la_size; + } + + if (info->fti_attr.la_mtime >lvb->lvb_mtime || !increase_only) { + CDEBUG(D_DLMTRACE, "res: "LPU64" updating lvb mtime from disk: " + LPU64" -> "LPU64"\n", res->lr_name.name[0], + lvb->lvb_mtime, info->fti_attr.la_mtime); + lvb->lvb_mtime = info->fti_attr.la_mtime; + } + if (info->fti_attr.la_atime >lvb->lvb_atime || !increase_only) { + CDEBUG(D_DLMTRACE, "res: "LPU64" updating lvb atime from disk: " + LPU64" -> "LPU64"\n", res->lr_name.name[0], + lvb->lvb_atime, info->fti_attr.la_atime); + lvb->lvb_atime = info->fti_attr.la_atime; + } + if (info->fti_attr.la_ctime >lvb->lvb_ctime || !increase_only) { + CDEBUG(D_DLMTRACE, "res: "LPU64" updating lvb ctime from disk: " + LPU64" -> "LPU64"\n", res->lr_name.name[0], + lvb->lvb_ctime, info->fti_attr.la_ctime); + lvb->lvb_ctime = info->fti_attr.la_ctime; + } + if (lvb->lvb_blocks != info->fti_attr.la_blocks) { + CDEBUG(D_DLMTRACE,"res: "LPU64" updating lvb blocks from disk: " + LPU64" -> %llu\n", res->lr_name.name[0], + lvb->lvb_blocks, + (unsigned long long)info->fti_attr.la_blocks); + lvb->lvb_blocks = info->fti_attr.la_blocks; + } + unlock_res(res); + +out_obj: + ofd_object_put(&env, fo); +out_env: + lu_env_fini(&env); +out_unlock: + return rc; +} + +struct ldlm_valblock_ops ofd_lvbo = { + lvbo_init: ofd_lvbo_init, + lvbo_update: ofd_lvbo_update, + lvbo_free: ofd_lvbo_free, +};