X-Git-Url: https://git.whamcloud.com/?a=blobdiff_plain;f=lustre%2Fofd%2Fofd_lvb.c;h=7b463889ae5712cf8b0552c08f56dc4bbfa38cf0;hb=99bb9f91f5c5ca6a380b22efa04a3c00c8f520ca;hp=fa1e6f4c09d120ba7b026e06b3abf2c09f95c2de;hpb=abd8c6dd6fa708e8064f1095350eba23b9437b7c;p=fs%2Flustre-release.git diff --git a/lustre/ofd/ofd_lvb.c b/lustre/ofd/ofd_lvb.c index fa1e6f4..7b46388 100644 --- a/lustre/ofd/ofd_lvb.c +++ b/lustre/ofd/ofd_lvb.c @@ -15,11 +15,7 @@ * * You should have received a copy of the GNU General Public License * version 2 along with this program; If not, see - * http://www.sun.com/software/products/lustre/docs/GPLv2.pdf - * - * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara, - * CA 95054 USA or visit www.sun.com if you need additional information or - * have any questions. + * http://www.gnu.org/licenses/gpl-2.0.html * * GPL HEADER END */ @@ -27,7 +23,7 @@ * Copyright (c) 2009, 2010, Oracle and/or its affiliates. All rights reserved. * Use is subject to license terms. * - * Copyright (c) 2012, Intel Corporation. + * Copyright (c) 2012, 2017, Intel Corporation. */ /* * This file is part of Lustre, http://www.lustre.org/ @@ -35,14 +31,34 @@ * * lustre/ofd/ofd_lvb.c * - * Author: Mikhail Pershin - * Author: Alexey Zhuravlev + * This file contains methods for OBD Filter Device (OFD) + * Lock Value Block (LVB) operations. + * + * LVB is special opaque (to LDLM) data that is associated with an LDLM lock + * and transferred from client to server and back. OFD LVBs are used to + * maintain current object size/times. + * + * Author: Andreas Dilger + * Author: Mikhail Pershin + * Author: Alexey Zhuravlev */ #define DEBUG_SUBSYSTEM S_FILTER +#include #include "ofd_internal.h" +/** + * Implementation of ldlm_valblock_ops::lvbo_free for OFD. + * + * This function frees allocated LVB data if it associated with the given + * LDLM resource. + * + * \param[in] res LDLM resource + * + * \retval 0 on successful setup + * \retval negative value on error + */ static int ofd_lvbo_free(struct ldlm_resource *res) { if (res->lr_lvb_data) @@ -51,20 +67,33 @@ static int ofd_lvbo_free(struct ldlm_resource *res) return 0; } -/* Called with res->lr_lvb_sem held */ -static int ofd_lvbo_init(struct ldlm_resource *res) +/** + * Implementation of ldlm_valblock_ops::lvbo_init for OFD. + * + * This function allocates and initializes new LVB data for the given + * LDLM resource if it is not allocated yet. New LVB is filled with attributes + * of the object associated with that resource. Function does nothing if LVB + * for the given LDLM resource is allocated already. + * + * Called with res->lr_lvb_sem held. + * + * \param[in] lock LDLM lock on resource + * + * \retval 0 on successful setup + * \retval negative value on error + */ +static int ofd_lvbo_init(const struct lu_env *env, struct ldlm_resource *res) { - struct ost_lvb *lvb = NULL; + struct ost_lvb *lvb; struct ofd_device *ofd; struct ofd_object *fo; struct ofd_thread_info *info; - struct lu_env env; - int rc = 0; - + struct lu_env _env; + int rc = 0; ENTRY; LASSERT(res); - LASSERT_MUTEX_LOCKED(&res->lr_lvb_mutex); + LASSERT(mutex_is_locked(&res->lr_lvb_mutex)); if (res->lr_lvb_data != NULL) RETURN(0); @@ -75,26 +104,30 @@ static int ofd_lvbo_init(struct ldlm_resource *res) if (OBD_FAIL_CHECK(OBD_FAIL_LDLM_OST_LVB)) RETURN(-ENOMEM); - rc = lu_env_init(&env, LCT_DT_THREAD); - if (rc) - RETURN(rc); + if (!env) { + rc = lu_env_init(&_env, LCT_DT_THREAD); + if (rc) + RETURN(rc); + env = &_env; + } OBD_ALLOC_PTR(lvb); if (lvb == NULL) GOTO(out, rc = -ENOMEM); + info = ofd_info(env); res->lr_lvb_data = lvb; res->lr_lvb_len = sizeof(*lvb); - info = ofd_info_init(&env, NULL); - ofd_fid_from_resid(&info->fti_fid, &res->lr_name); - fo = ofd_object_find(&env, ofd, &info->fti_fid); + ost_fid_from_resid(&info->fti_fid, &res->lr_name, + ofd->ofd_lut.lut_lsd.lsd_osd_index); + fo = ofd_object_find(env, ofd, &info->fti_fid); if (IS_ERR(fo)) - GOTO(out, rc = PTR_ERR(fo)); + GOTO(out_lvb, rc = PTR_ERR(fo)); - rc = ofd_attr_get(&env, fo, &info->fti_attr); + rc = ofd_attr_get(env, fo, &info->fti_attr); if (rc) - GOTO(out_put, rc); + GOTO(out_obj, rc); lvb->lvb_size = info->fti_attr.la_size; lvb->lvb_blocks = info->fti_attr.la_blocks; @@ -102,58 +135,83 @@ static int ofd_lvbo_init(struct ldlm_resource *res) lvb->lvb_atime = info->fti_attr.la_atime; lvb->lvb_ctime = info->fti_attr.la_ctime; - CDEBUG(D_DLMTRACE, "res: "LPX64" initial lvb size: "LPX64", " - "mtime: "LPX64", blocks: "LPX64"\n", - res->lr_name.name[0], lvb->lvb_size, + CDEBUG(D_DLMTRACE, "res: "DFID" initial lvb size: %llu, " + "mtime: %#llx, blocks: %#llx\n", + PFID(&info->fti_fid), lvb->lvb_size, lvb->lvb_mtime, lvb->lvb_blocks); + info->fti_attr.la_valid = 0; + EXIT; -out_put: - ofd_object_put(&env, fo); -out: - lu_env_fini(&env); - if (rc && lvb != NULL) +out_obj: + ofd_object_put(env, fo); +out_lvb: + if (rc != 0) OST_LVB_SET_ERR(lvb->lvb_blocks, rc); +out: /* Don't free lvb data on lookup error */ + if (env && env == &_env) + lu_env_fini(&_env); return rc; } -/* This will be called in two ways: +/** + * Implementation of ldlm_valblock_ops::lvbo_update for OFD. + * + * When a client generates a glimpse enqueue, it wants to get the current + * file size and updated attributes for a stat() type operation, but these + * attributes may be writeback cached on another client. The client with + * the DLM extent lock at the highest offset is asked for its current + * attributes via a glimpse callback on its extent lock, on the assumption + * that it has the highest file size and the newest timestamps. The timestamps + * are guaranteed to be correct if there is only a single writer on the file, + * but may be slightly inaccurate if there are multiple concurrent writers on + * the same object. In order to avoid race conditions between the glimpse AST + * and the client cancelling the lock, ofd_lvbo_update() also updates + * the attributes from the local object. If the last client hasn't done any + * writes yet, or has already written its data and cancelled its lock before + * it processed the glimpse, then the local inode will have more uptodate + * information. * - * r != NULL : called by the DLM itself after a glimpse callback - * r == NULL : called by the ofd after a disk write + * This is called in two ways: + * \a req != NULL : called by the DLM itself after a glimpse callback + * \a req == NULL : called by the OFD after a disk write * - * If 'increase_only' is true, don't allow values to move backwards. + * \param[in] lock LDLM lock + * \param[in] req PTLRPC request + * \param[in] increase_only don't allow LVB values to decrease + * + * \retval 0 on successful setup + * \retval negative value on error */ -static int ofd_lvbo_update(struct ldlm_resource *res, - struct ptlrpc_request *req, int increase_only) +static int ofd_lvbo_update(const struct lu_env *env, struct ldlm_resource *res, + struct ldlm_lock *lock, struct ptlrpc_request *req, + int increase_only) { + struct ofd_thread_info *info; struct ofd_device *ofd; struct ofd_object *fo; - struct ofd_thread_info *info; struct ost_lvb *lvb; - struct lu_env env; int rc = 0; ENTRY; + LASSERT(env); + info = ofd_info(env); LASSERT(res != NULL); ofd = ldlm_res_to_ns(res)->ns_lvbp; LASSERT(ofd != NULL); + fid_extract_from_res_name(&info->fti_fid, &res->lr_name); + lvb = res->lr_lvb_data; if (lvb == NULL) { - CERROR("%s: no lvb when running lvbo_update, res: "LPU64"!\n", - ofd_obd(ofd)->obd_name, res->lr_name.name[0]); - RETURN(0); + CERROR("%s: no LVB data for "DFID"\n", + ofd_name(ofd), PFID(&info->fti_fid)); + GOTO(out, rc = 0); } - rc = lu_env_init(&env, LCT_DT_THREAD); - if (rc) - GOTO(out_unlock, rc); - - info = ofd_info_init(&env, NULL); /* Update the LVB from the network message */ if (req != NULL) { struct ost_lvb *rpc_lvb; @@ -187,32 +245,32 @@ static int ofd_lvbo_update(struct ldlm_resource *res, lock_res(res); if (rpc_lvb->lvb_size > lvb->lvb_size || !increase_only) { - CDEBUG(D_DLMTRACE, "res: "LPU64" updating lvb size: " - LPU64" -> "LPU64"\n", res->lr_name.name[0], + CDEBUG(D_DLMTRACE, "res: "DFID" updating lvb size: " + "%llu -> %llu\n", PFID(&info->fti_fid), lvb->lvb_size, rpc_lvb->lvb_size); lvb->lvb_size = rpc_lvb->lvb_size; } if (rpc_lvb->lvb_mtime > lvb->lvb_mtime || !increase_only) { - CDEBUG(D_DLMTRACE, "res: "LPU64" updating lvb mtime: " - LPU64" -> "LPU64"\n", res->lr_name.name[0], + CDEBUG(D_DLMTRACE, "res: "DFID" updating lvb mtime: " + "%llu -> %llu\n", PFID(&info->fti_fid), lvb->lvb_mtime, rpc_lvb->lvb_mtime); lvb->lvb_mtime = rpc_lvb->lvb_mtime; } if (rpc_lvb->lvb_atime > lvb->lvb_atime || !increase_only) { - CDEBUG(D_DLMTRACE, "res: "LPU64" updating lvb atime: " - LPU64" -> "LPU64"\n", res->lr_name.name[0], + CDEBUG(D_DLMTRACE, "res: "DFID" updating lvb atime: " + "%llu -> %llu\n", PFID(&info->fti_fid), lvb->lvb_atime, rpc_lvb->lvb_atime); lvb->lvb_atime = rpc_lvb->lvb_atime; } if (rpc_lvb->lvb_ctime > lvb->lvb_ctime || !increase_only) { - CDEBUG(D_DLMTRACE, "res: "LPU64" updating lvb ctime: " - LPU64" -> "LPU64"\n", res->lr_name.name[0], + CDEBUG(D_DLMTRACE, "res: "DFID" updating lvb ctime: " + "%llu -> %llu\n", PFID(&info->fti_fid), lvb->lvb_ctime, rpc_lvb->lvb_ctime); lvb->lvb_ctime = rpc_lvb->lvb_ctime; } if (rpc_lvb->lvb_blocks > lvb->lvb_blocks || !increase_only) { - CDEBUG(D_DLMTRACE, "res: "LPU64" updating lvb blocks: " - LPU64" -> "LPU64"\n", res->lr_name.name[0], + CDEBUG(D_DLMTRACE, "res: "DFID" updating lvb blocks: " + "%llu -> %llu\n", PFID(&info->fti_fid), lvb->lvb_blocks, rpc_lvb->lvb_blocks); lvb->lvb_blocks = rpc_lvb->lvb_blocks; } @@ -221,58 +279,67 @@ static int ofd_lvbo_update(struct ldlm_resource *res, disk_update: /* Update the LVB from the disk inode */ - ofd_fid_from_resid(&info->fti_fid, &res->lr_name); - fo = ofd_object_find(&env, ofd, &info->fti_fid); + ost_fid_from_resid(&info->fti_fid, &res->lr_name, + ofd->ofd_lut.lut_lsd.lsd_osd_index); + fo = ofd_object_find(env, ofd, &info->fti_fid); if (IS_ERR(fo)) - GOTO(out_env, rc = PTR_ERR(fo)); + GOTO(out, rc = PTR_ERR(fo)); - rc = ofd_attr_get(&env, fo, &info->fti_attr); + rc = ofd_attr_get(env, fo, &info->fti_attr); if (rc) GOTO(out_obj, rc); lock_res(res); if (info->fti_attr.la_size > lvb->lvb_size || !increase_only) { - CDEBUG(D_DLMTRACE, "res: "LPU64" updating lvb size from disk: " - LPU64" -> %llu\n", res->lr_name.name[0], + CDEBUG(D_DLMTRACE, "res: "DFID" updating lvb size from disk: " + "%llu -> %llu\n", PFID(&info->fti_fid), lvb->lvb_size, info->fti_attr.la_size); lvb->lvb_size = info->fti_attr.la_size; } if (info->fti_attr.la_mtime >lvb->lvb_mtime || !increase_only) { - CDEBUG(D_DLMTRACE, "res: "LPU64" updating lvb mtime from disk: " - LPU64" -> "LPU64"\n", res->lr_name.name[0], + CDEBUG(D_DLMTRACE, "res: "DFID" updating lvb mtime from disk: " + "%llu -> %llu\n", PFID(&info->fti_fid), lvb->lvb_mtime, info->fti_attr.la_mtime); lvb->lvb_mtime = info->fti_attr.la_mtime; } if (info->fti_attr.la_atime >lvb->lvb_atime || !increase_only) { - CDEBUG(D_DLMTRACE, "res: "LPU64" updating lvb atime from disk: " - LPU64" -> "LPU64"\n", res->lr_name.name[0], + CDEBUG(D_DLMTRACE, "res: "DFID" updating lvb atime from disk: " + "%llu -> %llu\n", PFID(&info->fti_fid), lvb->lvb_atime, info->fti_attr.la_atime); lvb->lvb_atime = info->fti_attr.la_atime; } if (info->fti_attr.la_ctime >lvb->lvb_ctime || !increase_only) { - CDEBUG(D_DLMTRACE, "res: "LPU64" updating lvb ctime from disk: " - LPU64" -> "LPU64"\n", res->lr_name.name[0], + CDEBUG(D_DLMTRACE, "res: "DFID" updating lvb ctime from disk: " + "%llu -> %llu\n", PFID(&info->fti_fid), lvb->lvb_ctime, info->fti_attr.la_ctime); lvb->lvb_ctime = info->fti_attr.la_ctime; } if (info->fti_attr.la_blocks > lvb->lvb_blocks || !increase_only) { - CDEBUG(D_DLMTRACE,"res: "LPU64" updating lvb blocks from disk: " - LPU64" -> %llu\n", res->lr_name.name[0], - lvb->lvb_blocks, + CDEBUG(D_DLMTRACE, "res: "DFID" updating lvb blocks from disk: " + "%llu -> %llu\n", PFID(&info->fti_fid), lvb->lvb_blocks, (unsigned long long)info->fti_attr.la_blocks); lvb->lvb_blocks = info->fti_attr.la_blocks; } unlock_res(res); out_obj: - ofd_object_put(&env, fo); -out_env: - lu_env_fini(&env); -out_unlock: + ofd_object_put(env, fo); +out: return rc; } +/** + * Implementation of ldlm_valblock_ops::lvbo_size for OFD. + * + * This function returns size of LVB data so appropriate RPC size will be + * reserved. This is used for compatibility needs between server and client + * of different Lustre versions. + * + * \param[in] lock LDLM lock + * + * \retval size of LVB data + */ static int ofd_lvbo_size(struct ldlm_lock *lock) { if (lock->l_export != NULL && exp_connect_lvb_type(lock->l_export)) @@ -281,7 +348,20 @@ static int ofd_lvbo_size(struct ldlm_lock *lock) return sizeof(struct ost_lvb_v1); } -static int ofd_lvbo_fill(struct ldlm_lock *lock, void *buf, int buflen) +/** + * Implementation of ldlm_valblock_ops::lvbo_fill for OFD. + * + * This function is called to fill the given RPC buffer \a buf with LVB data + * + * \param[in] env execution environment + * \param[in] lock LDLM lock + * \param[in] buf RPC buffer to fill + * \param[in] buflen buffer length + * + * \retval size of LVB data written into \a buf buffer + */ +static int ofd_lvbo_fill(const struct lu_env *env, struct ldlm_lock *lock, + void *buf, int *buflen) { struct ldlm_resource *res = lock->l_resource; int lvb_len; @@ -293,8 +373,8 @@ static int ofd_lvbo_fill(struct ldlm_lock *lock, void *buf, int buflen) lvb_len = ofd_lvbo_size(lock); LASSERT(lvb_len <= res->lr_lvb_len); - if (lvb_len > buflen) - lvb_len = buflen; + if (lvb_len > *buflen) + lvb_len = *buflen; lock_res(res); memcpy(buf, res->lr_lvb_data, lvb_len); @@ -304,9 +384,9 @@ static int ofd_lvbo_fill(struct ldlm_lock *lock, void *buf, int buflen) } struct ldlm_valblock_ops ofd_lvbo = { - lvbo_init: ofd_lvbo_init, - lvbo_update: ofd_lvbo_update, - lvbo_free: ofd_lvbo_free, - lvbo_size: ofd_lvbo_size, - lvbo_fill: ofd_lvbo_fill + .lvbo_init = ofd_lvbo_init, + .lvbo_update = ofd_lvbo_update, + .lvbo_free = ofd_lvbo_free, + .lvbo_size = ofd_lvbo_size, + .lvbo_fill = ofd_lvbo_fill };