Whamcloud - gitweb
LU-1406 ofd: add DLM and LVB code
authorMikhail Pershin <tappro@whamcloud.com>
Wed, 23 May 2012 19:49:51 +0000 (23:49 +0400)
committerAndreas Dilger <adilger@whamcloud.com>
Tue, 19 Jun 2012 05:05:38 +0000 (01:05 -0400)
OFD dlm and lvb functions

Signed-off-by: Mikhail Pershin <tappro@whamcloud.com>
Change-Id: Ieddd2b16129f62c6e380d007aaa1f2de70e8a9ca
Reviewed-on: http://review.whamcloud.com/2892
Tested-by: Hudson
Reviewed-by: Andreas Dilger <adilger@whamcloud.com>
Tested-by: Maloo <whamcloud.maloo@gmail.com>
Reviewed-by: Alex Zhuravlev <bzzz@whamcloud.com>
lustre/ofd/Makefile.in
lustre/ofd/ofd_dev.c
lustre/ofd/ofd_dlm.c [new file with mode: 0644]
lustre/ofd/ofd_internal.h
lustre/ofd/ofd_lvb.c [new file with mode: 0644]

index aa73c81..15f0c46 100644 (file)
@@ -1,7 +1,7 @@
 MODULES := ofd
 
 ofd-objs := ofd_dev.o ofd_obd.o ofd_fs.o ofd_trans.o ofd_objects.o
-ofd-objs += lproc_ofd.o ofd_capa.o ofd_fmd.o ofd_grant.o
+ofd-objs += lproc_ofd.o ofd_capa.o ofd_fmd.o ofd_grant.o ofd_dlm.o ofd_lvb.o
 
 EXTRA_DIST = $(ofd-objs:%.o=%.c) ofd_internal.h
 
index 4c816df..9297ec3 100644 (file)
@@ -517,6 +517,12 @@ static int ofd_init0(const struct lu_env *env, struct ofd_device *m,
                GOTO(err_fini_stack, rc = -ENOMEM);
        /* set obd_namespace for compatibility with old code */
        obd->obd_namespace = m->ofd_namespace;
+       ldlm_register_intent(m->ofd_namespace, ofd_intent_policy);
+       m->ofd_namespace->ns_lvbo = &ofd_lvbo;
+       m->ofd_namespace->ns_lvbp = m;
+
+       ptlrpc_init_client(LDLM_CB_REQUEST_PORTAL, LDLM_CB_REPLY_PORTAL,
+                          "filter_ldlm_cb_client", &obd->obd_ldlm_client);
 
        dt_conf_get(env, m->ofd_osd, &m->ofd_dt_conf);
 
diff --git a/lustre/ofd/ofd_dlm.c b/lustre/ofd/ofd_dlm.c
new file mode 100644 (file)
index 0000000..34790c5
--- /dev/null
@@ -0,0 +1,241 @@
+/*
+ * GPL HEADER START
+ *
+ * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
+ *
+ * This program is free software; you can redistribute it and/or modify
+ * it under the terms of the GNU General Public License version 2 only,
+ * as published by the Free Software Foundation.
+ *
+ * This program is distributed in the hope that it will be useful, but
+ * WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
+ * General Public License version 2 for more details (a copy is included
+ * in the LICENSE file that accompanied this code).
+ *
+ * You should have received a copy of the GNU General Public License
+ * version 2 along with this program; If not, see
+ * http://www.sun.com/software/products/lustre/docs/GPLv2.pdf
+ *
+ * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara,
+ * CA 95054 USA or visit www.sun.com if you need additional information or
+ * have any questions.
+ *
+ * GPL HEADER END
+ */
+/*
+ * Copyright (c) 2009, 2010, Oracle and/or its affiliates. All rights reserved.
+ * Use is subject to license terms.
+ *
+ * Copyright (c) 2011, 2012, Whamcloud, Inc.
+ */
+/*
+ * This file is part of Lustre, http://www.lustre.org/
+ * Lustre is a trademark of Sun Microsystems, Inc.
+ *
+ * lustre/ofd/ofd_dlm.c
+ *
+ * Author: Mike Pershin <tappro@whamcloud.com>
+ * Author: Alex Zhuravlev <bzzz@whamcloud.com>
+ */
+
+#define DEBUG_SUBSYSTEM S_FILTER
+
+#include "ofd_internal.h"
+
+struct ofd_intent_args {
+       struct ldlm_lock        **victim;
+       __u64                    size;
+       int                     *liblustre;
+};
+
+static enum interval_iter ofd_intent_cb(struct interval_node *n, void *args)
+{
+       struct ldlm_interval     *node = (struct ldlm_interval *)n;
+       struct ofd_intent_args   *arg = args;
+       __u64                     size = arg->size;
+       struct ldlm_lock        **v = arg->victim;
+       struct ldlm_lock         *lck;
+
+       /* If the interval is lower than the current file size, just break. */
+       if (interval_high(n) <= size)
+               return INTERVAL_ITER_STOP;
+
+       cfs_list_for_each_entry(lck, &node->li_group, l_sl_policy) {
+               /* Don't send glimpse ASTs to liblustre clients.
+                * They aren't listening for them, and they do
+                * entirely synchronous I/O anyways. */
+               if (lck->l_export == NULL || lck->l_export->exp_libclient)
+                       continue;
+
+               if (*arg->liblustre)
+                       *arg->liblustre = 0;
+
+               if (*v == NULL) {
+                       *v = LDLM_LOCK_GET(lck);
+               } else if ((*v)->l_policy_data.l_extent.start <
+                          lck->l_policy_data.l_extent.start) {
+                       LDLM_LOCK_RELEASE(*v);
+                       *v = LDLM_LOCK_GET(lck);
+               }
+
+               /* the same policy group - every lock has the
+                * same extent, so needn't do it any more */
+               break;
+       }
+
+       return INTERVAL_ITER_CONT;
+}
+
+int ofd_intent_policy(struct ldlm_namespace *ns, struct ldlm_lock **lockp,
+                     void *req_cookie, ldlm_mode_t mode, int flags,
+                     void *data)
+{
+       struct ptlrpc_request           *req = req_cookie;
+       struct ldlm_lock                *lock = *lockp, *l = NULL;
+       struct ldlm_resource            *res = lock->l_resource;
+       ldlm_processing_policy           policy;
+       struct ost_lvb                  *res_lvb, *reply_lvb;
+       struct ldlm_reply               *rep;
+       ldlm_error_t                     err;
+       int                              idx, rc;
+       int                              tmpflags = 0, only_liblustre = 1;
+       struct ldlm_interval_tree       *tree;
+       struct ofd_intent_args           arg;
+       __u32                            repsize[3] = {
+               [MSG_PTLRPC_BODY_OFF] = sizeof(struct ptlrpc_body),
+               [DLM_LOCKREPLY_OFF]   = sizeof(*rep),
+               [DLM_REPLY_REC_OFF]   = sizeof(*reply_lvb)
+       };
+
+       ENTRY;
+
+       policy = ldlm_get_processing_policy(res);
+       LASSERT(policy != NULL);
+       LASSERT(req != NULL);
+
+       rc = lustre_pack_reply(req, 3, repsize, NULL);
+       if (rc)
+               RETURN(req->rq_status = rc);
+
+       rep = lustre_msg_buf(req->rq_repmsg, DLM_LOCKREPLY_OFF, sizeof(*rep));
+       LASSERT(rep != NULL);
+
+       reply_lvb = lustre_msg_buf(req->rq_repmsg, DLM_REPLY_REC_OFF,
+                                  sizeof(*reply_lvb));
+       LASSERT(reply_lvb != NULL);
+
+       /* Call the extent policy function to see if our request can be
+        * granted, or is blocked.
+        * If the OST lock has LDLM_FL_HAS_INTENT set, it means a glimpse
+        * lock, and should not be granted if the lock will be blocked.
+        */
+
+       if (flags & LDLM_FL_BLOCK_NOWAIT) {
+               OBD_FAIL_TIMEOUT(OBD_FAIL_LDLM_AGL_DELAY, 5);
+
+               if (OBD_FAIL_CHECK(OBD_FAIL_LDLM_AGL_NOLOCK))
+                       RETURN(ELDLM_LOCK_ABORTED);
+       }
+
+       LASSERT(ns == ldlm_res_to_ns(res));
+       lock_res(res);
+       rc = policy(lock, &tmpflags, 0, &err, NULL);
+       check_res_locked(res);
+
+       /* The lock met with no resistance; we're finished. */
+       if (rc == LDLM_ITER_CONTINUE) {
+               /* do not grant locks to the liblustre clients: they cannot
+                * handle ASTs robustly.  We need to do this while still
+                * holding ns_lock to avoid the lock remaining on the res_link
+                * list (and potentially being added to l_pending_list by an
+                * AST) when we are going to drop this lock ASAP. */
+               if (lock->l_export->exp_libclient ||
+                   OBD_FAIL_TIMEOUT(OBD_FAIL_LDLM_GLIMPSE, 2)) {
+                       ldlm_resource_unlink_lock(lock);
+                       err = ELDLM_LOCK_ABORTED;
+               } else {
+                       err = ELDLM_LOCK_REPLACED;
+               }
+               unlock_res(res);
+               RETURN(err);
+       } else if (flags & LDLM_FL_BLOCK_NOWAIT) {
+               /* LDLM_FL_BLOCK_NOWAIT means it is for AGL. Do not send glimpse
+                * callback for glimpse size. The real size user will trigger
+                * the glimpse callback when necessary. */
+               unlock_res(res);
+               RETURN(ELDLM_LOCK_ABORTED);
+       }
+
+       /* Do not grant any lock, but instead send GL callbacks.  The extent
+        * policy nicely created a list of all PW locks for us.  We will choose
+        * the highest of those which are larger than the size in the LVB, if
+        * any, and perform a glimpse callback. */
+       res_lvb = res->lr_lvb_data;
+       LASSERT(res_lvb != NULL);
+       *reply_lvb = *res_lvb;
+
+       /*
+        * ->ns_lock guarantees that no new locks are granted, and,
+        *  therefore, that res->lr_lvb_data cannot increase beyond the
+        *  end of already granted lock. As a result, it is safe to
+        *  check against "stale" reply_lvb->lvb_size value without
+        *  res->lr_lvb_sem.
+        */
+       arg.size = reply_lvb->lvb_size;
+       arg.victim = &l;
+       arg.liblustre = &only_liblustre;
+
+       for (idx = 0; idx < LCK_MODE_NUM; idx++) {
+               tree = &res->lr_itree[idx];
+               if (tree->lit_mode == LCK_PR)
+                       continue;
+
+               interval_iterate_reverse(tree->lit_root, ofd_intent_cb, &arg);
+       }
+       unlock_res(res);
+
+       /* There were no PW locks beyond the size in the LVB; finished. */
+       if (l == NULL) {
+               if (only_liblustre) {
+                       /* If we discovered a liblustre client with a PW lock,
+                        * however, the LVB may be out of date!  The LVB is
+                        * updated only on glimpse (which we don't do for
+                        * liblustre clients) and cancel (which the client
+                        * obviously has not yet done).  So if it has written
+                        * data but kept the lock, the LVB is stale and needs
+                        * to be updated from disk.
+                        *
+                        * Of course, this will all disappear when we switch to
+                        * taking liblustre locks on the OST. */
+                       ldlm_res_lvbo_update(res, NULL, 1);
+               }
+               RETURN(ELDLM_LOCK_ABORTED);
+       }
+
+       /*
+        * This check is for lock taken in ofd_prepare_destroy() that does
+        * not have l_glimpse_ast set. So the logic is: if there is a lock
+        * with no l_glimpse_ast set, this object is being destroyed already.
+        * Hence, if you are grabbing DLM locks on the server, always set
+        * non-NULL glimpse_ast (e.g., ldlm_request.c:ldlm_glimpse_ast()).
+        */
+       if (l->l_glimpse_ast == NULL) {
+               /* We are racing with unlink(); just return -ENOENT */
+               rep->lock_policy_res1 = -ENOENT;
+               goto out;
+       }
+
+       LASSERTF(l->l_glimpse_ast != NULL, "l == %p", l);
+       rc = l->l_glimpse_ast(l, NULL); /* this will update the LVB */
+
+       lock_res(res);
+       *reply_lvb = *res_lvb;
+       unlock_res(res);
+
+out:
+       LDLM_LOCK_RELEASE(l);
+
+       RETURN(ELDLM_LOCK_ABORTED);
+}
+
index c41f992..f3a1a2c 100644 (file)
@@ -407,6 +407,14 @@ void ofd_fmd_drop(struct obd_export *exp, struct lu_fid *fid);
 #define ofd_fmd_drop(exp, fid) do {} while (0)
 #endif
 
+/* ofd_lvb.c */
+extern struct ldlm_valblock_ops ofd_lvbo;
+
+/* ofd_dlm.c */
+int ofd_intent_policy(struct ldlm_namespace *ns, struct ldlm_lock **lockp,
+                     void *req_cookie, ldlm_mode_t mode, int flags,
+                     void *data);
+
 static inline struct ofd_thread_info * ofd_info(const struct lu_env *env)
 {
        struct ofd_thread_info *info;
@@ -437,6 +445,39 @@ static inline struct ofd_thread_info * ofd_info_init(const struct lu_env *env,
        return info;
 }
 
+/* The same as osc_build_res_name() */
+static inline void ofd_build_resid(const struct lu_fid *fid,
+                                  struct ldlm_res_id *resname)
+{
+       if (fid_is_idif(fid)) {
+               /* get id/seq like ostid_idif_pack() does */
+               osc_build_res_name(fid_idif_id(fid_seq(fid), fid_oid(fid),
+                                              fid_ver(fid)),
+                                  FID_SEQ_OST_MDT0, resname);
+       } else {
+               /* In the future, where OSTs have FID sequences allocated. */
+               fid_build_reg_res_name(fid, resname);
+       }
+}
+
+static inline void ofd_fid_from_resid(struct lu_fid *fid,
+                                     const struct ldlm_res_id *name)
+{
+       /* if seq is FID_SEQ_OST_MDT0 then we have IDIF and resid was built
+        * using osc_build_res_name function. */
+       if (fid_seq_is_mdt0(name->name[LUSTRE_RES_ID_VER_OID_OFF])) {
+               struct ost_id ostid;
+
+               ostid.oi_id = name->name[LUSTRE_RES_ID_SEQ_OFF];
+               ostid.oi_seq = name->name[LUSTRE_RES_ID_VER_OID_OFF];
+               fid_ostid_unpack(fid, &ostid, 0);
+       } else {
+               fid->f_seq = name->name[LUSTRE_RES_ID_SEQ_OFF];
+               fid->f_oid = (__u32)name->name[LUSTRE_RES_ID_VER_OID_OFF];
+               fid->f_ver = name->name[LUSTRE_RES_ID_VER_OID_OFF] >> 32;
+       }
+}
+
 /* sync on lock cancel is useless when we force a journal flush,
  * and if we enable async journal commit, we should also turn on
  * sync on lock cancel if it is not enabled already. */
diff --git a/lustre/ofd/ofd_lvb.c b/lustre/ofd/ofd_lvb.c
new file mode 100644 (file)
index 0000000..208198c
--- /dev/null
@@ -0,0 +1,250 @@
+/*
+ * GPL HEADER START
+ *
+ * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
+ *
+ * This program is free software; you can redistribute it and/or modify
+ * it under the terms of the GNU General Public License version 2 only,
+ * as published by the Free Software Foundation.
+ *
+ * This program is distributed in the hope that it will be useful, but
+ * WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
+ * General Public License version 2 for more details (a copy is included
+ * in the LICENSE file that accompanied this code).
+ *
+ * You should have received a copy of the GNU General Public License
+ * version 2 along with this program; If not, see
+ * http://www.sun.com/software/products/lustre/docs/GPLv2.pdf
+ *
+ * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara,
+ * CA 95054 USA or visit www.sun.com if you need additional information or
+ * have any questions.
+ *
+ * GPL HEADER END
+ */
+/*
+ * Copyright (c) 2009, 2010, Oracle and/or its affiliates. All rights reserved.
+ * Use is subject to license terms.
+ *
+ * Copyright (c) 2011, 2012, Whamcloud, Inc.
+ */
+/*
+ * This file is part of Lustre, http://www.lustre.org/
+ * Lustre is a trademark of Sun Microsystems, Inc.
+ *
+ * lustre/ofd/ofd_lvb.c
+ *
+ * Author: Mikhail Pershin <tappro@whamcloud.com>
+ * Author: Alexey Zhuravlev <bzzz@whamcloud.com>
+ */
+
+#define DEBUG_SUBSYSTEM S_FILTER
+
+#include "ofd_internal.h"
+
+static int ofd_lvbo_free(struct ldlm_resource *res)
+{
+       if (res->lr_lvb_data)
+               OBD_FREE(res->lr_lvb_data, res->lr_lvb_len);
+
+       return 0;
+}
+
+/* Called with res->lr_lvb_sem held */
+static int ofd_lvbo_init(struct ldlm_resource *res)
+{
+       struct ost_lvb          *lvb = NULL;
+       struct ofd_device       *ofd;
+       struct ofd_object       *fo;
+       struct ofd_thread_info  *info;
+       struct lu_env            env;
+       int                      rc = 0;
+
+       ENTRY;
+
+       LASSERT(res);
+       LASSERT_MUTEX_LOCKED(&res->lr_lvb_mutex);
+
+       if (res->lr_lvb_data != NULL)
+               RETURN(0);
+
+       ofd = ldlm_res_to_ns(res)->ns_lvbp;
+       LASSERT(ofd != NULL);
+
+       rc = lu_env_init(&env, LCT_DT_THREAD);
+       if (rc)
+               RETURN(rc);
+
+       OBD_ALLOC_PTR(lvb);
+       if (lvb == NULL)
+               RETURN(-ENOMEM);
+
+       res->lr_lvb_data = lvb;
+       res->lr_lvb_len = sizeof(*lvb);
+
+       info = ofd_info_init(&env, NULL);
+       ofd_fid_from_resid(&info->fti_fid, &res->lr_name);
+       fo = ofd_object_find(&env, ofd, &info->fti_fid);
+       if (IS_ERR(fo))
+               GOTO(out, rc = PTR_ERR(fo));
+
+       rc = ofd_attr_get(&env, fo, &info->fti_attr);
+       if (rc)
+               GOTO(out_put, rc);
+
+       lvb->lvb_size = info->fti_attr.la_size;
+       lvb->lvb_blocks = info->fti_attr.la_blocks;
+       lvb->lvb_mtime = info->fti_attr.la_mtime;
+       lvb->lvb_atime = info->fti_attr.la_atime;
+       lvb->lvb_ctime = info->fti_attr.la_ctime;
+
+       CDEBUG(D_DLMTRACE, "res: "LPX64" initial lvb size: "LPX64", "
+              "mtime: "LPX64", blocks: "LPX64"\n",
+              res->lr_name.name[0], lvb->lvb_size,
+              lvb->lvb_mtime, lvb->lvb_blocks);
+
+       EXIT;
+out_put:
+       ofd_object_put(&env, fo);
+out:
+       lu_env_fini(&env);
+       if (rc)
+               OST_LVB_SET_ERR(lvb->lvb_blocks, rc);
+       /* Don't free lvb data on lookup error */
+       return rc;
+}
+
+/* This will be called in two ways:
+ *
+ *   r != NULL : called by the DLM itself after a glimpse callback
+ *   r == NULL : called by the ofd after a disk write
+ *
+ *   If 'increase_only' is true, don't allow values to move backwards.
+ */
+static int ofd_lvbo_update(struct ldlm_resource *res,
+                          struct ptlrpc_request *req, int increase_only)
+{
+       struct ofd_device       *ofd;
+       struct ofd_object       *fo;
+       struct ofd_thread_info  *info;
+       struct ost_lvb          *lvb;
+       struct lu_env            env;
+       int                      rc = 0;
+
+       ENTRY;
+
+       LASSERT(res != NULL);
+
+       ofd = ldlm_res_to_ns(res)->ns_lvbp;
+       LASSERT(ofd != NULL);
+
+       lvb = res->lr_lvb_data;
+       if (lvb == NULL) {
+               CERROR("%s: no lvb when running lvbo_update, res: "LPU64"!\n",
+                      ofd_obd(ofd)->obd_name, res->lr_name.name[0]);
+               RETURN(0);
+       }
+
+       rc = lu_env_init(&env, LCT_DT_THREAD);
+       if (rc)
+               GOTO(out_unlock, rc);
+
+       info = ofd_info_init(&env, NULL);
+       /* Update the LVB from the network message */
+       if (req != NULL) {
+               struct ost_lvb *rpc_lvb;
+
+               /* XXX update always from reply buffer */
+               rpc_lvb = req_capsule_server_get(&req->rq_pill, &RMF_DLM_LVB);
+               if (rpc_lvb == NULL) {
+                       CERROR("lustre_swab_buf failed\n");
+                       goto disk_update;
+               }
+               lock_res(res);
+               if (rpc_lvb->lvb_size > lvb->lvb_size || !increase_only) {
+                       CDEBUG(D_DLMTRACE, "res: "LPU64" updating lvb size: "
+                              LPU64" -> "LPU64"\n", res->lr_name.name[0],
+                              lvb->lvb_size, rpc_lvb->lvb_size);
+                       lvb->lvb_size = rpc_lvb->lvb_size;
+               }
+               if (rpc_lvb->lvb_mtime > lvb->lvb_mtime || !increase_only) {
+                       CDEBUG(D_DLMTRACE, "res: "LPU64" updating lvb mtime: "
+                              LPU64" -> "LPU64"\n", res->lr_name.name[0],
+                              lvb->lvb_mtime, rpc_lvb->lvb_mtime);
+                       lvb->lvb_mtime = rpc_lvb->lvb_mtime;
+               }
+               if (rpc_lvb->lvb_atime > lvb->lvb_atime || !increase_only) {
+                       CDEBUG(D_DLMTRACE, "res: "LPU64" updating lvb atime: "
+                              LPU64" -> "LPU64"\n", res->lr_name.name[0],
+                              lvb->lvb_atime, rpc_lvb->lvb_atime);
+                       lvb->lvb_atime = rpc_lvb->lvb_atime;
+               }
+               if (rpc_lvb->lvb_ctime > lvb->lvb_ctime || !increase_only) {
+                       CDEBUG(D_DLMTRACE, "res: "LPU64" updating lvb ctime: "
+                              LPU64" -> "LPU64"\n", res->lr_name.name[0],
+                              lvb->lvb_ctime, rpc_lvb->lvb_ctime);
+                       lvb->lvb_ctime = rpc_lvb->lvb_ctime;
+               }
+               unlock_res(res);
+       }
+
+disk_update:
+       /* Update the LVB from the disk inode */
+       ofd_fid_from_resid(&info->fti_fid, &res->lr_name);
+       fo = ofd_object_find(&env, ofd, &info->fti_fid);
+       if (IS_ERR(fo))
+               GOTO(out_env, rc = PTR_ERR(fo));
+
+       rc = ofd_attr_get(&env, fo, &info->fti_attr);
+       if (rc)
+               GOTO(out_obj, rc);
+
+       lock_res(res);
+       if (info->fti_attr.la_size > lvb->lvb_size || !increase_only) {
+               CDEBUG(D_DLMTRACE, "res: "LPU64" updating lvb size from disk: "
+                      LPU64" -> %llu\n", res->lr_name.name[0],
+                      lvb->lvb_size, info->fti_attr.la_size);
+               lvb->lvb_size = info->fti_attr.la_size;
+       }
+
+       if (info->fti_attr.la_mtime >lvb->lvb_mtime || !increase_only) {
+               CDEBUG(D_DLMTRACE, "res: "LPU64" updating lvb mtime from disk: "
+                      LPU64" -> "LPU64"\n", res->lr_name.name[0],
+                      lvb->lvb_mtime, info->fti_attr.la_mtime);
+               lvb->lvb_mtime = info->fti_attr.la_mtime;
+       }
+       if (info->fti_attr.la_atime >lvb->lvb_atime || !increase_only) {
+               CDEBUG(D_DLMTRACE, "res: "LPU64" updating lvb atime from disk: "
+                      LPU64" -> "LPU64"\n", res->lr_name.name[0],
+                      lvb->lvb_atime, info->fti_attr.la_atime);
+               lvb->lvb_atime = info->fti_attr.la_atime;
+       }
+       if (info->fti_attr.la_ctime >lvb->lvb_ctime || !increase_only) {
+               CDEBUG(D_DLMTRACE, "res: "LPU64" updating lvb ctime from disk: "
+                      LPU64" -> "LPU64"\n", res->lr_name.name[0],
+                      lvb->lvb_ctime, info->fti_attr.la_ctime);
+               lvb->lvb_ctime = info->fti_attr.la_ctime;
+       }
+       if (lvb->lvb_blocks != info->fti_attr.la_blocks) {
+               CDEBUG(D_DLMTRACE,"res: "LPU64" updating lvb blocks from disk: "
+                      LPU64" -> %llu\n", res->lr_name.name[0],
+                      lvb->lvb_blocks,
+                      (unsigned long long)info->fti_attr.la_blocks);
+               lvb->lvb_blocks = info->fti_attr.la_blocks;
+       }
+       unlock_res(res);
+
+out_obj:
+       ofd_object_put(&env, fo);
+out_env:
+       lu_env_fini(&env);
+out_unlock:
+       return rc;
+}
+
+struct ldlm_valblock_ops ofd_lvbo = {
+       lvbo_init:      ofd_lvbo_init,
+       lvbo_update:    ofd_lvbo_update,
+       lvbo_free:      ofd_lvbo_free,
+};