From: Mikhail Pershin Date: Wed, 23 May 2012 19:00:33 +0000 (+0400) Subject: LU-1406 ofd: IO operations X-Git-Tag: 2.2.57~12 X-Git-Url: https://git.whamcloud.com/?p=fs%2Flustre-release.git;a=commitdiff_plain;h=a67ea1c51daad1a2c774bdceeabbfac927d348e6 LU-1406 ofd: IO operations add IO functions to OFD Signed-off-by: Mikhail Pershin Change-Id: Ie78e92f5f770ff07c94bf87b2d5ee8300673d271 Reviewed-on: http://review.whamcloud.com/2893 Reviewed-by: Andreas Dilger Tested-by: Hudson Tested-by: Maloo Reviewed-by: Alex Zhuravlev --- diff --git a/lustre/include/obd.h b/lustre/include/obd.h index 7c20ac2..768dfb9 100644 --- a/lustre/include/obd.h +++ b/lustre/include/obd.h @@ -864,6 +864,8 @@ struct obd_trans_info { struct llog_cookie oti_onecookie; struct llog_cookie *oti_logcookies; int oti_numcookies; + /** synchronous write is needed */ + long oti_sync_write:1; /* initial thread handling transaction */ struct ptlrpc_thread * oti_thread; diff --git a/lustre/ofd/Makefile.in b/lustre/ofd/Makefile.in index 15f0c46..2d5aa43 100644 --- a/lustre/ofd/Makefile.in +++ b/lustre/ofd/Makefile.in @@ -1,6 +1,6 @@ MODULES := ofd -ofd-objs := ofd_dev.o ofd_obd.o ofd_fs.o ofd_trans.o ofd_objects.o +ofd-objs := ofd_dev.o ofd_obd.o ofd_fs.o ofd_trans.o ofd_objects.o ofd_io.o ofd-objs += lproc_ofd.o ofd_capa.o ofd_fmd.o ofd_grant.o ofd_dlm.o ofd_lvb.o EXTRA_DIST = $(ofd-objs:%.o=%.c) ofd_internal.h diff --git a/lustre/ofd/ofd_internal.h b/lustre/ofd/ofd_internal.h index f3a1a2c..6eca8c2 100644 --- a/lustre/ofd/ofd_internal.h +++ b/lustre/ofd/ofd_internal.h @@ -50,6 +50,10 @@ OBD_INCOMPAT_COMMON_LR) #define OFD_MAX_GROUPS 256 +/* Limit the returned fields marked valid to those that we actually might set */ +#define OFD_VALID_FLAGS (LA_TYPE | LA_MODE | LA_SIZE | LA_BLOCKS | \ + LA_BLKSIZE | LA_ATIME | LA_MTIME | LA_CTIME) + /* per-client-per-object persistent state (LRU) */ struct ofd_mod_data { cfs_list_t fmd_list; /* linked to fed_mod_list */ @@ -90,6 +94,7 @@ struct ofd_device { cfs_mutex_t ofd_create_locks[OFD_MAX_GROUPS]; struct dt_object *ofd_lastid_obj[OFD_MAX_GROUPS]; cfs_spinlock_t ofd_objid_lock; + unsigned long ofd_destroys_in_progress; /* protect all statfs-related counters */ cfs_spinlock_t ofd_osfs_lock; @@ -253,6 +258,8 @@ struct ofd_thread_info { struct lu_fid fti_fid; struct lu_attr fti_attr; struct lu_attr fti_attr2; + struct ldlm_res_id fti_resid; + struct filter_fid fti_mds_fid; struct filter_fid fti_mds_fid2; struct ost_id fti_ostid; struct ofd_object *fti_obj; @@ -297,6 +304,18 @@ int ofd_fs_setup(const struct lu_env *env, struct ofd_device *ofd, struct obd_device *obd); void ofd_fs_cleanup(const struct lu_env *env, struct ofd_device *ofd); +/* ofd_io.c */ +int ofd_preprw(const struct lu_env *env,int cmd, struct obd_export *exp, + struct obdo *oa, int objcount, struct obd_ioobj *obj, + struct niobuf_remote *rnb, int *nr_local, + struct niobuf_local *lnb, struct obd_trans_info *oti, + struct lustre_capa *capa); +int ofd_commitrw(const struct lu_env *env, int cmd, struct obd_export *exp, + struct obdo *oa, int objcount, struct obd_ioobj *obj, + struct niobuf_remote *rnb, int npages, + struct niobuf_local *lnb, struct obd_trans_info *oti, + int old_rc); + /* ofd_trans.c */ struct thandle *ofd_trans_create(const struct lu_env *env, struct ofd_device *ofd); @@ -478,6 +497,26 @@ static inline void ofd_fid_from_resid(struct lu_fid *fid, } } +static inline void ofd_oti2info(struct ofd_thread_info *info, + struct obd_trans_info *oti) +{ + info->fti_xid = oti->oti_xid; + info->fti_transno = oti->oti_transno; + info->fti_pre_version = oti->oti_pre_version; +} + +static inline void ofd_info2oti(struct ofd_thread_info *info, + struct obd_trans_info *oti) +{ + oti->oti_xid = info->fti_xid; + LASSERTF(ergo(oti->oti_transno > 0, + oti->oti_transno == info->fti_transno), + "Overwrite replay transno "LPX64" by "LPX64"\n", + oti->oti_transno, info->fti_transno); + oti->oti_transno = info->fti_transno; + oti->oti_pre_version = info->fti_pre_version; +} + /* sync on lock cancel is useless when we force a journal flush, * and if we enable async journal commit, we should also turn on * sync on lock cancel if it is not enabled already. */ @@ -505,9 +544,12 @@ static inline void ofd_prepare_fidea(struct filter_fid *ff, struct obdo *oa) ff->ff_seq = cpu_to_le64(oa->o_seq); } -/* niobuf_local has no rnb_ prefix in master */ +/* niobuf_remote has no rnb_ prefix in master */ #define rnb_offset offset #define rnb_flags flags #define rnb_len len +/* the same for niobuf_local */ +#define lnb_flags flags +#define lnb_rc rc #endif /* _OFD_INTERNAL_H */ diff --git a/lustre/ofd/ofd_io.c b/lustre/ofd/ofd_io.c new file mode 100644 index 0000000..a75e5cd --- /dev/null +++ b/lustre/ofd/ofd_io.c @@ -0,0 +1,542 @@ +/* + * GPL HEADER START + * + * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER. + * + * This program is free software; you can redistribute it and/or modify + * it under the terms of the GNU General Public License version 2 only, + * as published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, but + * WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU + * General Public License version 2 for more details (a copy is included + * in the LICENSE file that accompanied this code). + * + * You should have received a copy of the GNU General Public License + * version 2 along with this program; If not, see + * http://www.sun.com/software/products/lustre/docs/GPLv2.pdf + * + * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara, + * CA 95054 USA or visit www.sun.com if you need additional information or + * have any questions. + * + * GPL HEADER END + */ +/* + * Copyright (c) 2009, 2010, Oracle and/or its affiliates. All rights reserved. + * Use is subject to license terms. + * + * Copyright (c) 2011, 2012, Whamcloud, Inc. + */ +/* + * This file is part of Lustre, http://www.lustre.org/ + * Lustre is a trademark of Sun Microsystems, Inc. + * + * lustre/ofd/ofd_io.c + * + * Author: Alex Tomas + */ + +#define DEBUG_SUBSYSTEM S_FILTER + +#include "ofd_internal.h" + +static int ofd_preprw_read(const struct lu_env *env, struct ofd_device *ofd, + struct lu_fid *fid, struct lu_attr *la, int niocount, + struct niobuf_remote *rnb, int *nr_local, + struct niobuf_local *lnb) +{ + struct ofd_object *fo; + int i, j, rc, tot_bytes = 0; + + ENTRY; + LASSERT(env != NULL); + + fo = ofd_object_find(env, ofd, fid); + if (IS_ERR(fo)) + RETURN(PTR_ERR(fo)); + LASSERT(fo != NULL); + + ofd_read_lock(env, fo); + if (!ofd_object_exists(fo)) + GOTO(unlock, rc = -ENOENT); + + /* parse remote buffers to local buffers and prepare the latter */ + for (i = 0, j = 0; i < niocount; i++) { + rc = dt_bufs_get(env, ofd_object_child(fo), rnb + i, + lnb + j, 0, ofd_object_capa(env, fo)); + LASSERT(rc > 0); + LASSERT(rc <= PTLRPC_MAX_BRW_PAGES); + /* correct index for local buffers to continue with */ + j += rc; + LASSERT(j <= PTLRPC_MAX_BRW_PAGES); + tot_bytes += rnb[i].rnb_len; + } + + *nr_local = j; + LASSERT(*nr_local > 0 && *nr_local <= PTLRPC_MAX_BRW_PAGES); + rc = dt_attr_get(env, ofd_object_child(fo), la, + ofd_object_capa(env, fo)); + if (unlikely(rc)) + GOTO(buf_put, rc); + + rc = dt_read_prep(env, ofd_object_child(fo), lnb, *nr_local); + if (unlikely(rc)) + GOTO(buf_put, rc); + lprocfs_counter_add(ofd_obd(ofd)->obd_stats, + LPROC_OFD_READ_BYTES, tot_bytes); + RETURN(0); + +buf_put: + dt_bufs_put(env, ofd_object_child(fo), lnb, *nr_local); +unlock: + ofd_read_unlock(env, fo); + ofd_object_put(env, fo); + return rc; +} + +static int ofd_preprw_write(const struct lu_env *env, struct obd_export *exp, + struct ofd_device *ofd, struct lu_fid *fid, + struct lu_attr *la, struct obdo *oa, + int objcount, struct obd_ioobj *obj, + struct niobuf_remote *rnb, int *nr_local, + struct niobuf_local *lnb, + struct obd_trans_info *oti) +{ + struct ofd_object *fo; + int i, j, k, rc = 0, tot_bytes = 0; + + ENTRY; + LASSERT(env != NULL); + LASSERT(objcount == 1); + + if (unlikely(exp->exp_obd->obd_recovering)) { + struct ofd_thread_info *info = ofd_info(env); + + /* copied from ofd_precreate_object */ + /* XXX this should be consolidated to use the same code + * instead of a copy, due to the ongoing risk of bugs. */ + memset(&info->fti_attr, 0, sizeof(info->fti_attr)); + info->fti_attr.la_valid = LA_TYPE | LA_MODE; + info->fti_attr.la_mode = S_IFREG | S_ISUID | S_ISGID | 0666; + info->fti_attr.la_valid |= LA_ATIME | LA_MTIME | LA_CTIME; + /* Initialize a/c/m time so any client timestamp will always + * be newer and update the inode. ctime = 0 is also handled + * specially in osd_inode_setattr(). See LU-221, LU-1042 */ + info->fti_attr.la_atime = 0; + info->fti_attr.la_mtime = 0; + info->fti_attr.la_ctime = 0; + + fo = ofd_object_find_or_create(env, ofd, fid, &info->fti_attr); + } else { + fo = ofd_object_find(env, ofd, fid); + } + + if (IS_ERR(fo)) + GOTO(out, rc = PTR_ERR(fo)); + LASSERT(fo != NULL); + + ofd_read_lock(env, fo); + if (!ofd_object_exists(fo)) { + CERROR("%s: BRW to missing obj "LPU64"/"LPU64"\n", + exp->exp_obd->obd_name, obj->ioo_id, obj->ioo_seq); + ofd_read_unlock(env, fo); + ofd_object_put(env, fo); + GOTO(out, rc = -ENOENT); + } + + /* Always sync if syncjournal parameter is set */ + oti->oti_sync_write = ofd->ofd_syncjournal; + + /* Process incoming grant info, set OBD_BRW_GRANTED flag and grant some + * space back if possible */ + ofd_grant_prepare_write(env, exp, oa, rnb, obj->ioo_bufcnt); + + /* parse remote buffers to local buffers and prepare the latter */ + for (i = 0, j = 0; i < obj->ioo_bufcnt; i++) { + rc = dt_bufs_get(env, ofd_object_child(fo), + rnb + i, lnb + j, 1, + ofd_object_capa(env, fo)); + LASSERT(rc > 0); + LASSERT(rc <= PTLRPC_MAX_BRW_PAGES); + /* correct index for local buffers to continue with */ + for (k = 0; k < rc; k++) { + lnb[j+k].lnb_flags = rnb[i].rnb_flags; + if (!(rnb[i].rnb_flags & OBD_BRW_GRANTED)) + lnb[j+k].lnb_rc = -ENOSPC; + if (!(rnb[i].rnb_flags & OBD_BRW_ASYNC)) + oti->oti_sync_write = 1; + } + j += rc; + LASSERT(j <= PTLRPC_MAX_BRW_PAGES); + tot_bytes += rnb[i].rnb_len; + } + *nr_local = j; + LASSERT(*nr_local > 0 && *nr_local <= PTLRPC_MAX_BRW_PAGES); + + lprocfs_counter_add(ofd_obd(ofd)->obd_stats, + LPROC_OFD_WRITE_BYTES, tot_bytes); + rc = dt_write_prep(env, ofd_object_child(fo), lnb, *nr_local); + if (unlikely(rc != 0)) { + dt_bufs_put(env, ofd_object_child(fo), lnb, *nr_local); + ofd_read_unlock(env, fo); + /* ofd_grant_prepare_write() was called, so we must commit */ + ofd_grant_commit(env, exp, rc); + } + + RETURN(rc); +out: + /* let's still process incoming grant information packed in the oa, + * but without enforcing grant since we won't proceed with the write. + * Just like a read request actually. */ + ofd_grant_prepare_read(env, exp, oa); + RETURN(rc); +} + +int ofd_preprw(const struct lu_env* env, int cmd, struct obd_export *exp, + struct obdo *oa, int objcount, struct obd_ioobj *obj, + struct niobuf_remote *rnb, int *nr_local, + struct niobuf_local *lnb, struct obd_trans_info *oti, + struct lustre_capa *capa) +{ + struct ofd_device *ofd = ofd_exp(exp); + struct ofd_thread_info *info; + int rc = 0; + + if (OBD_FAIL_CHECK(OBD_FAIL_OST_ENOENT) && + ofd->ofd_destroys_in_progress == 0) { + /* don't fail lookups for orphan recovery, it causes + * later LBUGs when objects still exist during precreate */ + CDEBUG(D_INFO, "*** obd_fail_loc=%x ***\n",OBD_FAIL_OST_ENOENT); + RETURN(-ENOENT); + } + + info = ofd_info_init(env, exp); + + LASSERT(objcount == 1); + LASSERT(obj->ioo_bufcnt > 0); + + fid_ostid_unpack(&info->fti_fid, &oa->o_oi, 0); + if (cmd == OBD_BRW_WRITE) { + rc = ofd_auth_capa(exp, &info->fti_fid, oa->o_seq, + capa, CAPA_OPC_OSS_WRITE); + if (rc == 0) { + LASSERT(oa != NULL); + la_from_obdo(&info->fti_attr, oa, OBD_MD_FLGETATTR); + rc = ofd_preprw_write(env, exp, ofd, &info->fti_fid, + &info->fti_attr, oa, objcount, + obj, rnb, nr_local, lnb, oti); + } + } else if (cmd == OBD_BRW_READ) { + rc = ofd_auth_capa(exp, &info->fti_fid, oa->o_seq, + capa, CAPA_OPC_OSS_READ); + if (rc == 0) { + ofd_grant_prepare_read(env, exp, oa); + rc = ofd_preprw_read(env, ofd, &info->fti_fid, + &info->fti_attr, obj->ioo_bufcnt, + rnb, nr_local, lnb); + obdo_from_la(oa, &info->fti_attr, LA_ATIME); + } + } else { + CERROR("%s: wrong cmd %d received!\n", + exp->exp_obd->obd_name, cmd); + rc = -EPROTO; + } + RETURN(rc); +} + +static int +ofd_commitrw_read(const struct lu_env *env, struct ofd_device *ofd, + struct lu_fid *fid, int objcount, int niocount, + struct niobuf_local *lnb) +{ + struct ofd_object *fo; + + ENTRY; + + LASSERT(niocount > 0); + + fo = ofd_object_find(env, ofd, fid); + if (IS_ERR(fo)) + RETURN(PTR_ERR(fo)); + LASSERT(fo != NULL); + LASSERT(ofd_object_exists(fo)); + dt_bufs_put(env, ofd_object_child(fo), lnb, niocount); + + ofd_read_unlock(env, fo); + ofd_object_put(env, fo); + /* second put is pair to object_get in ofd_preprw_read */ + ofd_object_put(env, fo); + + RETURN(0); +} + +static int +ofd_write_attr_set(const struct lu_env *env, struct ofd_device *ofd, + struct ofd_object *ofd_obj, struct lu_attr *la, + struct filter_fid *ff) +{ + struct ofd_thread_info *info = ofd_info(env); + __u64 valid = la->la_valid; + int rc; + struct thandle *th; + struct dt_object *dt_obj; + int ff_needed = 0; + + ENTRY; + + LASSERT(la); + + dt_obj = ofd_object_child(ofd_obj); + LASSERT(dt_obj != NULL); + + la->la_valid &= LA_UID | LA_GID; + + rc = ofd_attr_handle_ugid(env, ofd_obj, la, 0 /* !is_setattr */); + if (rc != 0) + GOTO(out, rc); + + if (ff != NULL) { + rc = ofd_object_ff_check(env, ofd_obj); + if (rc == -ENODATA) + ff_needed = 1; + else if (rc < 0) + GOTO(out, rc); + } + + if (!la->la_valid && !ff_needed) + /* no attributes to set */ + GOTO(out, rc = 0); + + th = ofd_trans_create(env, ofd); + if (IS_ERR(th)) + GOTO(out, rc = PTR_ERR(th)); + + if (la->la_valid) { + rc = dt_declare_attr_set(env, dt_obj, la, th); + if (rc) + GOTO(out_tx, rc); + } + + if (ff_needed) { + info->fti_buf.lb_buf = ff; + info->fti_buf.lb_len = sizeof(*ff); + rc = dt_declare_xattr_set(env, dt_obj, &info->fti_buf, + XATTR_NAME_FID, 0, th); + if (rc) + GOTO(out_tx, rc); + } + + /* We don't need a transno for this operation which will be re-executed + * anyway when the OST_WRITE (with a transno assigned) is replayed */ + rc = dt_trans_start_local(env, ofd->ofd_osd , th); + if (rc) + GOTO(out_tx, rc); + + /* set uid/gid */ + if (la->la_valid) { + rc = dt_attr_set(env, dt_obj, la, th, + ofd_object_capa(env, ofd_obj)); + if (rc) + GOTO(out_tx, rc); + } + + /* set filter fid EA */ + if (ff_needed) { + rc = dt_xattr_set(env, dt_obj, &info->fti_buf, XATTR_NAME_FID, + 0, th, BYPASS_CAPA); + if (rc) + GOTO(out_tx, rc); + } + + EXIT; +out_tx: + dt_trans_stop(env, ofd->ofd_osd, th); +out: + la->la_valid = valid; + return rc; +} + +static int +ofd_commitrw_write(const struct lu_env *env, struct ofd_device *ofd, + struct lu_fid *fid, struct lu_attr *la, + struct filter_fid *ff, int objcount, + int niocount, struct niobuf_local *lnb, + struct obd_trans_info *oti, int old_rc) +{ + struct ofd_thread_info *info = ofd_info(env); + struct ofd_object *fo; + struct dt_object *o; + struct thandle *th; + int rc = 0; + int retries = 0; + + ENTRY; + + LASSERT(objcount == 1); + + fo = ofd_object_find(env, ofd, fid); + LASSERT(fo != NULL); + LASSERT(ofd_object_exists(fo)); + + o = ofd_object_child(fo); + LASSERT(o != NULL); + + if (old_rc) + GOTO(out, rc = old_rc); + + /* + * The first write to each object must set some attributes. It is + * important to set the uid/gid before calling + * dt_declare_write_commit() since quota enforcement is now handled in + * declare phases. + */ + rc = ofd_write_attr_set(env, ofd, fo, la, ff); + if (rc) + GOTO(out, rc); + + la->la_valid &= LA_ATIME | LA_MTIME | LA_CTIME; + +retry: + th = ofd_trans_create(env, ofd); + if (IS_ERR(th)) + GOTO(out, rc = PTR_ERR(th)); + + th->th_sync |= oti->oti_sync_write; + + rc = dt_declare_write_commit(env, o, lnb, niocount, th); + if (rc) + GOTO(out_stop, rc); + + if (la->la_valid) { + /* update [mac]time if needed */ + rc = dt_declare_attr_set(env, o, la, th); + if (rc) + GOTO(out_stop, rc); + } + + rc = ofd_trans_start(env, ofd, fo, th); + if (rc) + GOTO(out_stop, rc); + + rc = dt_write_commit(env, o, lnb, niocount, th); + if (rc) + GOTO(out_stop, rc); + + if (la->la_valid) { + rc = dt_attr_set(env, o, la, th, ofd_object_capa(env, fo)); + if (rc) + GOTO(out_stop, rc); + } + + /* get attr to return */ + dt_attr_get(env, o, la, ofd_object_capa(env, fo)); + +out_stop: + /* Force commit to make the just-deleted blocks + * reusable. LU-456 */ + if (rc == -ENOSPC) + th->th_sync = 1; + + ofd_trans_stop(env, ofd, th, rc); + if (rc == -ENOSPC && retries++ < 3) { + CDEBUG(D_INODE, "retry after force commit, retries:%d\n", + retries); + goto retry; + } + +out: + dt_bufs_put(env, o, lnb, niocount); + ofd_read_unlock(env, fo); + ofd_object_put(env, fo); + /* second put is pair to object_get in ofd_preprw_write */ + ofd_object_put(env, fo); + ofd_grant_commit(env, info->fti_exp, old_rc); + RETURN(rc); +} + +int ofd_commitrw(const struct lu_env *env, int cmd, struct obd_export *exp, + struct obdo *oa, int objcount, struct obd_ioobj *obj, + struct niobuf_remote *rnb, int npages, + struct niobuf_local *lnb, struct obd_trans_info *oti, + int old_rc) +{ + struct ofd_thread_info *info; + struct ofd_mod_data *fmd; + __u64 valid; + struct ofd_device *ofd = ofd_exp(exp); + struct filter_fid *ff = NULL; + int rc = 0; + + info = ofd_info(env); + ofd_oti2info(info, oti); + + LASSERT(npages > 0); + + fid_ostid_unpack(&info->fti_fid, &oa->o_oi, 0); + if (cmd == OBD_BRW_WRITE) { + /* Don't update timestamps if this write is older than a + * setattr which modifies the timestamps. b=10150 */ + + /* XXX when we start having persistent reservations this needs + * to be changed to ofd_fmd_get() to create the fmd if it + * doesn't already exist so we can store the reservation handle + * there. */ + valid = OBD_MD_FLUID | OBD_MD_FLGID; + fmd = ofd_fmd_find(exp, &info->fti_fid); + if (!fmd || fmd->fmd_mactime_xid < info->fti_xid) + valid |= OBD_MD_FLATIME | OBD_MD_FLMTIME | + OBD_MD_FLCTIME; + ofd_fmd_put(exp, fmd); + la_from_obdo(&info->fti_attr, oa, valid); + + if (oa->o_valid & OBD_MD_FLFID) { + ff = &info->fti_mds_fid; + ofd_prepare_fidea(ff, oa); + } + + rc = ofd_commitrw_write(env, ofd, &info->fti_fid, + &info->fti_attr, ff, objcount, npages, + lnb, oti, old_rc); + if (rc == 0) + obdo_from_la(oa, &info->fti_attr, + OFD_VALID_FLAGS | LA_GID | LA_UID); + else + obdo_from_la(oa, &info->fti_attr, LA_GID | LA_UID); + + if (ofd_grant_prohibit(exp, ofd)) + /* Trick to prevent clients from waiting for bulk write + * in flight since they won't get any grant in the reply + * anyway so they had better firing the sync write RPC + * straight away */ + oa->o_valid |= OBD_MD_FLUSRQUOTA | OBD_MD_FLGRPQUOTA; + } else if (cmd == OBD_BRW_READ) { + struct ldlm_namespace *ns = ofd->ofd_namespace; + + /* If oa != NULL then ofd_preprw_read updated the inode + * atime and we should update the lvb so that other glimpses + * will also get the updated value. bug 5972 */ + if (oa && ns && ns->ns_lvbo && ns->ns_lvbo->lvbo_update) { + struct ldlm_resource *rs = NULL; + + ofd_build_resid(&info->fti_fid, &info->fti_resid); + rs = ldlm_resource_get(ns, NULL, &info->fti_resid, + LDLM_EXTENT, 0); + if (rs != NULL) { + ns->ns_lvbo->lvbo_update(rs, NULL, 1); + ldlm_resource_putref(rs); + } + } + rc = ofd_commitrw_read(env, ofd, &info->fti_fid, objcount, + npages, lnb); + if (old_rc) + rc = old_rc; + } else { + LBUG(); + rc = -EPROTO; + } + + ofd_info2oti(info, oti); + RETURN(rc); +} diff --git a/lustre/ofd/ofd_obd.c b/lustre/ofd/ofd_obd.c index 6135f0f..3f53b1b 100644 --- a/lustre/ofd/ofd_obd.c +++ b/lustre/ofd/ofd_obd.c @@ -841,6 +841,8 @@ struct obd_ops ofd_obd_ops = { .o_set_info_async = ofd_set_info_async, .o_get_info = ofd_get_info, .o_statfs = ofd_statfs, + .o_preprw = ofd_preprw, + .o_commitrw = ofd_commitrw, .o_init_export = ofd_init_export, .o_destroy_export = ofd_destroy_export, .o_postrecov = ofd_obd_postrecov,