MODULES := ofd
ofd-objs := ofd_dev.o ofd_obd.o ofd_fs.o ofd_trans.o ofd_objects.o
-ofd-objs += lproc_ofd.o ofd_capa.o ofd_fmd.o ofd_grant.o
+ofd-objs += lproc_ofd.o ofd_capa.o ofd_fmd.o ofd_grant.o ofd_dlm.o ofd_lvb.o
EXTRA_DIST = $(ofd-objs:%.o=%.c) ofd_internal.h
GOTO(err_fini_stack, rc = -ENOMEM);
/* set obd_namespace for compatibility with old code */
obd->obd_namespace = m->ofd_namespace;
+ ldlm_register_intent(m->ofd_namespace, ofd_intent_policy);
+ m->ofd_namespace->ns_lvbo = &ofd_lvbo;
+ m->ofd_namespace->ns_lvbp = m;
+
+ ptlrpc_init_client(LDLM_CB_REQUEST_PORTAL, LDLM_CB_REPLY_PORTAL,
+ "filter_ldlm_cb_client", &obd->obd_ldlm_client);
dt_conf_get(env, m->ofd_osd, &m->ofd_dt_conf);
--- /dev/null
+/*
+ * GPL HEADER START
+ *
+ * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
+ *
+ * This program is free software; you can redistribute it and/or modify
+ * it under the terms of the GNU General Public License version 2 only,
+ * as published by the Free Software Foundation.
+ *
+ * This program is distributed in the hope that it will be useful, but
+ * WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * General Public License version 2 for more details (a copy is included
+ * in the LICENSE file that accompanied this code).
+ *
+ * You should have received a copy of the GNU General Public License
+ * version 2 along with this program; If not, see
+ * http://www.sun.com/software/products/lustre/docs/GPLv2.pdf
+ *
+ * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara,
+ * CA 95054 USA or visit www.sun.com if you need additional information or
+ * have any questions.
+ *
+ * GPL HEADER END
+ */
+/*
+ * Copyright (c) 2009, 2010, Oracle and/or its affiliates. All rights reserved.
+ * Use is subject to license terms.
+ *
+ * Copyright (c) 2011, 2012, Whamcloud, Inc.
+ */
+/*
+ * This file is part of Lustre, http://www.lustre.org/
+ * Lustre is a trademark of Sun Microsystems, Inc.
+ *
+ * lustre/ofd/ofd_dlm.c
+ *
+ * Author: Mike Pershin <tappro@whamcloud.com>
+ * Author: Alex Zhuravlev <bzzz@whamcloud.com>
+ */
+
+#define DEBUG_SUBSYSTEM S_FILTER
+
+#include "ofd_internal.h"
+
+struct ofd_intent_args {
+ struct ldlm_lock **victim;
+ __u64 size;
+ int *liblustre;
+};
+
+static enum interval_iter ofd_intent_cb(struct interval_node *n, void *args)
+{
+ struct ldlm_interval *node = (struct ldlm_interval *)n;
+ struct ofd_intent_args *arg = args;
+ __u64 size = arg->size;
+ struct ldlm_lock **v = arg->victim;
+ struct ldlm_lock *lck;
+
+ /* If the interval is lower than the current file size, just break. */
+ if (interval_high(n) <= size)
+ return INTERVAL_ITER_STOP;
+
+ cfs_list_for_each_entry(lck, &node->li_group, l_sl_policy) {
+ /* Don't send glimpse ASTs to liblustre clients.
+ * They aren't listening for them, and they do
+ * entirely synchronous I/O anyways. */
+ if (lck->l_export == NULL || lck->l_export->exp_libclient)
+ continue;
+
+ if (*arg->liblustre)
+ *arg->liblustre = 0;
+
+ if (*v == NULL) {
+ *v = LDLM_LOCK_GET(lck);
+ } else if ((*v)->l_policy_data.l_extent.start <
+ lck->l_policy_data.l_extent.start) {
+ LDLM_LOCK_RELEASE(*v);
+ *v = LDLM_LOCK_GET(lck);
+ }
+
+ /* the same policy group - every lock has the
+ * same extent, so needn't do it any more */
+ break;
+ }
+
+ return INTERVAL_ITER_CONT;
+}
+
+int ofd_intent_policy(struct ldlm_namespace *ns, struct ldlm_lock **lockp,
+ void *req_cookie, ldlm_mode_t mode, int flags,
+ void *data)
+{
+ struct ptlrpc_request *req = req_cookie;
+ struct ldlm_lock *lock = *lockp, *l = NULL;
+ struct ldlm_resource *res = lock->l_resource;
+ ldlm_processing_policy policy;
+ struct ost_lvb *res_lvb, *reply_lvb;
+ struct ldlm_reply *rep;
+ ldlm_error_t err;
+ int idx, rc;
+ int tmpflags = 0, only_liblustre = 1;
+ struct ldlm_interval_tree *tree;
+ struct ofd_intent_args arg;
+ __u32 repsize[3] = {
+ [MSG_PTLRPC_BODY_OFF] = sizeof(struct ptlrpc_body),
+ [DLM_LOCKREPLY_OFF] = sizeof(*rep),
+ [DLM_REPLY_REC_OFF] = sizeof(*reply_lvb)
+ };
+
+ ENTRY;
+
+ policy = ldlm_get_processing_policy(res);
+ LASSERT(policy != NULL);
+ LASSERT(req != NULL);
+
+ rc = lustre_pack_reply(req, 3, repsize, NULL);
+ if (rc)
+ RETURN(req->rq_status = rc);
+
+ rep = lustre_msg_buf(req->rq_repmsg, DLM_LOCKREPLY_OFF, sizeof(*rep));
+ LASSERT(rep != NULL);
+
+ reply_lvb = lustre_msg_buf(req->rq_repmsg, DLM_REPLY_REC_OFF,
+ sizeof(*reply_lvb));
+ LASSERT(reply_lvb != NULL);
+
+ /* Call the extent policy function to see if our request can be
+ * granted, or is blocked.
+ * If the OST lock has LDLM_FL_HAS_INTENT set, it means a glimpse
+ * lock, and should not be granted if the lock will be blocked.
+ */
+
+ if (flags & LDLM_FL_BLOCK_NOWAIT) {
+ OBD_FAIL_TIMEOUT(OBD_FAIL_LDLM_AGL_DELAY, 5);
+
+ if (OBD_FAIL_CHECK(OBD_FAIL_LDLM_AGL_NOLOCK))
+ RETURN(ELDLM_LOCK_ABORTED);
+ }
+
+ LASSERT(ns == ldlm_res_to_ns(res));
+ lock_res(res);
+ rc = policy(lock, &tmpflags, 0, &err, NULL);
+ check_res_locked(res);
+
+ /* The lock met with no resistance; we're finished. */
+ if (rc == LDLM_ITER_CONTINUE) {
+ /* do not grant locks to the liblustre clients: they cannot
+ * handle ASTs robustly. We need to do this while still
+ * holding ns_lock to avoid the lock remaining on the res_link
+ * list (and potentially being added to l_pending_list by an
+ * AST) when we are going to drop this lock ASAP. */
+ if (lock->l_export->exp_libclient ||
+ OBD_FAIL_TIMEOUT(OBD_FAIL_LDLM_GLIMPSE, 2)) {
+ ldlm_resource_unlink_lock(lock);
+ err = ELDLM_LOCK_ABORTED;
+ } else {
+ err = ELDLM_LOCK_REPLACED;
+ }
+ unlock_res(res);
+ RETURN(err);
+ } else if (flags & LDLM_FL_BLOCK_NOWAIT) {
+ /* LDLM_FL_BLOCK_NOWAIT means it is for AGL. Do not send glimpse
+ * callback for glimpse size. The real size user will trigger
+ * the glimpse callback when necessary. */
+ unlock_res(res);
+ RETURN(ELDLM_LOCK_ABORTED);
+ }
+
+ /* Do not grant any lock, but instead send GL callbacks. The extent
+ * policy nicely created a list of all PW locks for us. We will choose
+ * the highest of those which are larger than the size in the LVB, if
+ * any, and perform a glimpse callback. */
+ res_lvb = res->lr_lvb_data;
+ LASSERT(res_lvb != NULL);
+ *reply_lvb = *res_lvb;
+
+ /*
+ * ->ns_lock guarantees that no new locks are granted, and,
+ * therefore, that res->lr_lvb_data cannot increase beyond the
+ * end of already granted lock. As a result, it is safe to
+ * check against "stale" reply_lvb->lvb_size value without
+ * res->lr_lvb_sem.
+ */
+ arg.size = reply_lvb->lvb_size;
+ arg.victim = &l;
+ arg.liblustre = &only_liblustre;
+
+ for (idx = 0; idx < LCK_MODE_NUM; idx++) {
+ tree = &res->lr_itree[idx];
+ if (tree->lit_mode == LCK_PR)
+ continue;
+
+ interval_iterate_reverse(tree->lit_root, ofd_intent_cb, &arg);
+ }
+ unlock_res(res);
+
+ /* There were no PW locks beyond the size in the LVB; finished. */
+ if (l == NULL) {
+ if (only_liblustre) {
+ /* If we discovered a liblustre client with a PW lock,
+ * however, the LVB may be out of date! The LVB is
+ * updated only on glimpse (which we don't do for
+ * liblustre clients) and cancel (which the client
+ * obviously has not yet done). So if it has written
+ * data but kept the lock, the LVB is stale and needs
+ * to be updated from disk.
+ *
+ * Of course, this will all disappear when we switch to
+ * taking liblustre locks on the OST. */
+ ldlm_res_lvbo_update(res, NULL, 1);
+ }
+ RETURN(ELDLM_LOCK_ABORTED);
+ }
+
+ /*
+ * This check is for lock taken in ofd_prepare_destroy() that does
+ * not have l_glimpse_ast set. So the logic is: if there is a lock
+ * with no l_glimpse_ast set, this object is being destroyed already.
+ * Hence, if you are grabbing DLM locks on the server, always set
+ * non-NULL glimpse_ast (e.g., ldlm_request.c:ldlm_glimpse_ast()).
+ */
+ if (l->l_glimpse_ast == NULL) {
+ /* We are racing with unlink(); just return -ENOENT */
+ rep->lock_policy_res1 = -ENOENT;
+ goto out;
+ }
+
+ LASSERTF(l->l_glimpse_ast != NULL, "l == %p", l);
+ rc = l->l_glimpse_ast(l, NULL); /* this will update the LVB */
+
+ lock_res(res);
+ *reply_lvb = *res_lvb;
+ unlock_res(res);
+
+out:
+ LDLM_LOCK_RELEASE(l);
+
+ RETURN(ELDLM_LOCK_ABORTED);
+}
+
#define ofd_fmd_drop(exp, fid) do {} while (0)
#endif
+/* ofd_lvb.c */
+extern struct ldlm_valblock_ops ofd_lvbo;
+
+/* ofd_dlm.c */
+int ofd_intent_policy(struct ldlm_namespace *ns, struct ldlm_lock **lockp,
+ void *req_cookie, ldlm_mode_t mode, int flags,
+ void *data);
+
static inline struct ofd_thread_info * ofd_info(const struct lu_env *env)
{
struct ofd_thread_info *info;
return info;
}
+/* The same as osc_build_res_name() */
+static inline void ofd_build_resid(const struct lu_fid *fid,
+ struct ldlm_res_id *resname)
+{
+ if (fid_is_idif(fid)) {
+ /* get id/seq like ostid_idif_pack() does */
+ osc_build_res_name(fid_idif_id(fid_seq(fid), fid_oid(fid),
+ fid_ver(fid)),
+ FID_SEQ_OST_MDT0, resname);
+ } else {
+ /* In the future, where OSTs have FID sequences allocated. */
+ fid_build_reg_res_name(fid, resname);
+ }
+}
+
+static inline void ofd_fid_from_resid(struct lu_fid *fid,
+ const struct ldlm_res_id *name)
+{
+ /* if seq is FID_SEQ_OST_MDT0 then we have IDIF and resid was built
+ * using osc_build_res_name function. */
+ if (fid_seq_is_mdt0(name->name[LUSTRE_RES_ID_VER_OID_OFF])) {
+ struct ost_id ostid;
+
+ ostid.oi_id = name->name[LUSTRE_RES_ID_SEQ_OFF];
+ ostid.oi_seq = name->name[LUSTRE_RES_ID_VER_OID_OFF];
+ fid_ostid_unpack(fid, &ostid, 0);
+ } else {
+ fid->f_seq = name->name[LUSTRE_RES_ID_SEQ_OFF];
+ fid->f_oid = (__u32)name->name[LUSTRE_RES_ID_VER_OID_OFF];
+ fid->f_ver = name->name[LUSTRE_RES_ID_VER_OID_OFF] >> 32;
+ }
+}
+
/* sync on lock cancel is useless when we force a journal flush,
* and if we enable async journal commit, we should also turn on
* sync on lock cancel if it is not enabled already. */
--- /dev/null
+/*
+ * GPL HEADER START
+ *
+ * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
+ *
+ * This program is free software; you can redistribute it and/or modify
+ * it under the terms of the GNU General Public License version 2 only,
+ * as published by the Free Software Foundation.
+ *
+ * This program is distributed in the hope that it will be useful, but
+ * WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * General Public License version 2 for more details (a copy is included
+ * in the LICENSE file that accompanied this code).
+ *
+ * You should have received a copy of the GNU General Public License
+ * version 2 along with this program; If not, see
+ * http://www.sun.com/software/products/lustre/docs/GPLv2.pdf
+ *
+ * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara,
+ * CA 95054 USA or visit www.sun.com if you need additional information or
+ * have any questions.
+ *
+ * GPL HEADER END
+ */
+/*
+ * Copyright (c) 2009, 2010, Oracle and/or its affiliates. All rights reserved.
+ * Use is subject to license terms.
+ *
+ * Copyright (c) 2011, 2012, Whamcloud, Inc.
+ */
+/*
+ * This file is part of Lustre, http://www.lustre.org/
+ * Lustre is a trademark of Sun Microsystems, Inc.
+ *
+ * lustre/ofd/ofd_lvb.c
+ *
+ * Author: Mikhail Pershin <tappro@whamcloud.com>
+ * Author: Alexey Zhuravlev <bzzz@whamcloud.com>
+ */
+
+#define DEBUG_SUBSYSTEM S_FILTER
+
+#include "ofd_internal.h"
+
+static int ofd_lvbo_free(struct ldlm_resource *res)
+{
+ if (res->lr_lvb_data)
+ OBD_FREE(res->lr_lvb_data, res->lr_lvb_len);
+
+ return 0;
+}
+
+/* Called with res->lr_lvb_sem held */
+static int ofd_lvbo_init(struct ldlm_resource *res)
+{
+ struct ost_lvb *lvb = NULL;
+ struct ofd_device *ofd;
+ struct ofd_object *fo;
+ struct ofd_thread_info *info;
+ struct lu_env env;
+ int rc = 0;
+
+ ENTRY;
+
+ LASSERT(res);
+ LASSERT_MUTEX_LOCKED(&res->lr_lvb_mutex);
+
+ if (res->lr_lvb_data != NULL)
+ RETURN(0);
+
+ ofd = ldlm_res_to_ns(res)->ns_lvbp;
+ LASSERT(ofd != NULL);
+
+ rc = lu_env_init(&env, LCT_DT_THREAD);
+ if (rc)
+ RETURN(rc);
+
+ OBD_ALLOC_PTR(lvb);
+ if (lvb == NULL)
+ RETURN(-ENOMEM);
+
+ res->lr_lvb_data = lvb;
+ res->lr_lvb_len = sizeof(*lvb);
+
+ info = ofd_info_init(&env, NULL);
+ ofd_fid_from_resid(&info->fti_fid, &res->lr_name);
+ fo = ofd_object_find(&env, ofd, &info->fti_fid);
+ if (IS_ERR(fo))
+ GOTO(out, rc = PTR_ERR(fo));
+
+ rc = ofd_attr_get(&env, fo, &info->fti_attr);
+ if (rc)
+ GOTO(out_put, rc);
+
+ lvb->lvb_size = info->fti_attr.la_size;
+ lvb->lvb_blocks = info->fti_attr.la_blocks;
+ lvb->lvb_mtime = info->fti_attr.la_mtime;
+ lvb->lvb_atime = info->fti_attr.la_atime;
+ lvb->lvb_ctime = info->fti_attr.la_ctime;
+
+ CDEBUG(D_DLMTRACE, "res: "LPX64" initial lvb size: "LPX64", "
+ "mtime: "LPX64", blocks: "LPX64"\n",
+ res->lr_name.name[0], lvb->lvb_size,
+ lvb->lvb_mtime, lvb->lvb_blocks);
+
+ EXIT;
+out_put:
+ ofd_object_put(&env, fo);
+out:
+ lu_env_fini(&env);
+ if (rc)
+ OST_LVB_SET_ERR(lvb->lvb_blocks, rc);
+ /* Don't free lvb data on lookup error */
+ return rc;
+}
+
+/* This will be called in two ways:
+ *
+ * r != NULL : called by the DLM itself after a glimpse callback
+ * r == NULL : called by the ofd after a disk write
+ *
+ * If 'increase_only' is true, don't allow values to move backwards.
+ */
+static int ofd_lvbo_update(struct ldlm_resource *res,
+ struct ptlrpc_request *req, int increase_only)
+{
+ struct ofd_device *ofd;
+ struct ofd_object *fo;
+ struct ofd_thread_info *info;
+ struct ost_lvb *lvb;
+ struct lu_env env;
+ int rc = 0;
+
+ ENTRY;
+
+ LASSERT(res != NULL);
+
+ ofd = ldlm_res_to_ns(res)->ns_lvbp;
+ LASSERT(ofd != NULL);
+
+ lvb = res->lr_lvb_data;
+ if (lvb == NULL) {
+ CERROR("%s: no lvb when running lvbo_update, res: "LPU64"!\n",
+ ofd_obd(ofd)->obd_name, res->lr_name.name[0]);
+ RETURN(0);
+ }
+
+ rc = lu_env_init(&env, LCT_DT_THREAD);
+ if (rc)
+ GOTO(out_unlock, rc);
+
+ info = ofd_info_init(&env, NULL);
+ /* Update the LVB from the network message */
+ if (req != NULL) {
+ struct ost_lvb *rpc_lvb;
+
+ /* XXX update always from reply buffer */
+ rpc_lvb = req_capsule_server_get(&req->rq_pill, &RMF_DLM_LVB);
+ if (rpc_lvb == NULL) {
+ CERROR("lustre_swab_buf failed\n");
+ goto disk_update;
+ }
+ lock_res(res);
+ if (rpc_lvb->lvb_size > lvb->lvb_size || !increase_only) {
+ CDEBUG(D_DLMTRACE, "res: "LPU64" updating lvb size: "
+ LPU64" -> "LPU64"\n", res->lr_name.name[0],
+ lvb->lvb_size, rpc_lvb->lvb_size);
+ lvb->lvb_size = rpc_lvb->lvb_size;
+ }
+ if (rpc_lvb->lvb_mtime > lvb->lvb_mtime || !increase_only) {
+ CDEBUG(D_DLMTRACE, "res: "LPU64" updating lvb mtime: "
+ LPU64" -> "LPU64"\n", res->lr_name.name[0],
+ lvb->lvb_mtime, rpc_lvb->lvb_mtime);
+ lvb->lvb_mtime = rpc_lvb->lvb_mtime;
+ }
+ if (rpc_lvb->lvb_atime > lvb->lvb_atime || !increase_only) {
+ CDEBUG(D_DLMTRACE, "res: "LPU64" updating lvb atime: "
+ LPU64" -> "LPU64"\n", res->lr_name.name[0],
+ lvb->lvb_atime, rpc_lvb->lvb_atime);
+ lvb->lvb_atime = rpc_lvb->lvb_atime;
+ }
+ if (rpc_lvb->lvb_ctime > lvb->lvb_ctime || !increase_only) {
+ CDEBUG(D_DLMTRACE, "res: "LPU64" updating lvb ctime: "
+ LPU64" -> "LPU64"\n", res->lr_name.name[0],
+ lvb->lvb_ctime, rpc_lvb->lvb_ctime);
+ lvb->lvb_ctime = rpc_lvb->lvb_ctime;
+ }
+ unlock_res(res);
+ }
+
+disk_update:
+ /* Update the LVB from the disk inode */
+ ofd_fid_from_resid(&info->fti_fid, &res->lr_name);
+ fo = ofd_object_find(&env, ofd, &info->fti_fid);
+ if (IS_ERR(fo))
+ GOTO(out_env, rc = PTR_ERR(fo));
+
+ rc = ofd_attr_get(&env, fo, &info->fti_attr);
+ if (rc)
+ GOTO(out_obj, rc);
+
+ lock_res(res);
+ if (info->fti_attr.la_size > lvb->lvb_size || !increase_only) {
+ CDEBUG(D_DLMTRACE, "res: "LPU64" updating lvb size from disk: "
+ LPU64" -> %llu\n", res->lr_name.name[0],
+ lvb->lvb_size, info->fti_attr.la_size);
+ lvb->lvb_size = info->fti_attr.la_size;
+ }
+
+ if (info->fti_attr.la_mtime >lvb->lvb_mtime || !increase_only) {
+ CDEBUG(D_DLMTRACE, "res: "LPU64" updating lvb mtime from disk: "
+ LPU64" -> "LPU64"\n", res->lr_name.name[0],
+ lvb->lvb_mtime, info->fti_attr.la_mtime);
+ lvb->lvb_mtime = info->fti_attr.la_mtime;
+ }
+ if (info->fti_attr.la_atime >lvb->lvb_atime || !increase_only) {
+ CDEBUG(D_DLMTRACE, "res: "LPU64" updating lvb atime from disk: "
+ LPU64" -> "LPU64"\n", res->lr_name.name[0],
+ lvb->lvb_atime, info->fti_attr.la_atime);
+ lvb->lvb_atime = info->fti_attr.la_atime;
+ }
+ if (info->fti_attr.la_ctime >lvb->lvb_ctime || !increase_only) {
+ CDEBUG(D_DLMTRACE, "res: "LPU64" updating lvb ctime from disk: "
+ LPU64" -> "LPU64"\n", res->lr_name.name[0],
+ lvb->lvb_ctime, info->fti_attr.la_ctime);
+ lvb->lvb_ctime = info->fti_attr.la_ctime;
+ }
+ if (lvb->lvb_blocks != info->fti_attr.la_blocks) {
+ CDEBUG(D_DLMTRACE,"res: "LPU64" updating lvb blocks from disk: "
+ LPU64" -> %llu\n", res->lr_name.name[0],
+ lvb->lvb_blocks,
+ (unsigned long long)info->fti_attr.la_blocks);
+ lvb->lvb_blocks = info->fti_attr.la_blocks;
+ }
+ unlock_res(res);
+
+out_obj:
+ ofd_object_put(&env, fo);
+out_env:
+ lu_env_fini(&env);
+out_unlock:
+ return rc;
+}
+
+struct ldlm_valblock_ops ofd_lvbo = {
+ lvbo_init: ofd_lvbo_init,
+ lvbo_update: ofd_lvbo_update,
+ lvbo_free: ofd_lvbo_free,
+};