* Copyright (c) 2009, 2010, Oracle and/or its affiliates. All rights reserved.
* Use is subject to license terms.
*
- * Copyright (c) 2011, 2012, Whamcloud, Inc.
+ * Copyright (c) 2012, Intel Corporation.
*/
/*
* This file is part of Lustre, http://www.lustre.org/
#include "ofd_internal.h"
-static int ofd_preprw_read(const struct lu_env *env, struct ofd_device *ofd,
- struct lu_fid *fid, struct lu_attr *la, int niocount,
+static int ofd_preprw_read(const struct lu_env *env, struct obd_export *exp,
+ struct ofd_device *ofd, struct lu_fid *fid,
+ struct lu_attr *la, int niocount,
struct niobuf_remote *rnb, int *nr_local,
- struct niobuf_local *lnb)
+ struct niobuf_local *lnb,
+ struct obd_trans_info *oti)
{
struct ofd_object *fo;
int i, j, rc, tot_bytes = 0;
GOTO(buf_put, rc);
lprocfs_counter_add(ofd_obd(ofd)->obd_stats,
LPROC_OFD_READ_BYTES, tot_bytes);
+ ofd_counter_incr(exp, LPROC_OFD_STATS_READ,
+ oti->oti_jobid, tot_bytes);
RETURN(0);
buf_put:
*nr_local = j;
LASSERT(*nr_local > 0 && *nr_local <= PTLRPC_MAX_BRW_PAGES);
- lprocfs_counter_add(ofd_obd(ofd)->obd_stats,
- LPROC_OFD_WRITE_BYTES, tot_bytes);
rc = dt_write_prep(env, ofd_object_child(fo), lnb, *nr_local);
if (unlikely(rc != 0)) {
dt_bufs_put(env, ofd_object_child(fo), lnb, *nr_local);
ofd_read_unlock(env, fo);
/* ofd_grant_prepare_write() was called, so we must commit */
ofd_grant_commit(env, exp, rc);
+ GOTO(out, rc);
}
- RETURN(rc);
+ lprocfs_counter_add(ofd_obd(ofd)->obd_stats,
+ LPROC_OFD_WRITE_BYTES, tot_bytes);
+ ofd_counter_incr(exp, LPROC_OFD_STATS_WRITE,
+ oti->oti_jobid, tot_bytes);
+ RETURN(0);
out:
/* let's still process incoming grant information packed in the oa,
* but without enforcing grant since we won't proceed with the write.
* Just like a read request actually. */
ofd_grant_prepare_read(env, exp, oa);
- RETURN(rc);
+ return rc;
}
int ofd_preprw(const struct lu_env* env, int cmd, struct obd_export *exp,
struct ofd_thread_info *info;
int rc = 0;
- if (OBD_FAIL_CHECK(OBD_FAIL_OST_ENOENT) &&
- ofd->ofd_destroys_in_progress == 0) {
- /* don't fail lookups for orphan recovery, it causes
- * later LBUGs when objects still exist during precreate */
- CDEBUG(D_INFO, "*** obd_fail_loc=%x ***\n",OBD_FAIL_OST_ENOENT);
- RETURN(-ENOENT);
- }
-
+ rc = lu_env_refill((struct lu_env *)env);
+ LASSERT(rc == 0);
info = ofd_info_init(env, exp);
+ LASSERT(oa != NULL);
+
+ if (OBD_FAIL_CHECK(OBD_FAIL_OST_ENOENT)) {
+ struct ofd_seq *oseq;
+ oseq = ofd_seq_load(env, ofd, oa->o_seq);
+ if (oseq == NULL) {
+ CERROR("%s: Can not find seq for "LPU64":"LPU64"\n",
+ ofd_name(ofd), oa->o_seq, oa->o_id);
+ RETURN(-EINVAL);
+ }
+
+ if (oseq->os_destroys_in_progress == 0) {
+ /* don't fail lookups for orphan recovery, it causes
+ * later LBUGs when objects still exist during
+ * precreate */
+ ofd_seq_put(env, oseq);
+ RETURN(-ENOENT);
+ }
+ ofd_seq_put(env, oseq);
+ }
+
LASSERT(objcount == 1);
LASSERT(obj->ioo_bufcnt > 0);
rc = ofd_auth_capa(exp, &info->fti_fid, oa->o_seq,
capa, CAPA_OPC_OSS_WRITE);
if (rc == 0) {
- LASSERT(oa != NULL);
la_from_obdo(&info->fti_attr, oa, OBD_MD_FLGETATTR);
rc = ofd_preprw_write(env, exp, ofd, &info->fti_fid,
&info->fti_attr, oa, objcount,
capa, CAPA_OPC_OSS_READ);
if (rc == 0) {
ofd_grant_prepare_read(env, exp, oa);
- rc = ofd_preprw_read(env, ofd, &info->fti_fid,
+ rc = ofd_preprw_read(env, exp, ofd, &info->fti_fid,
&info->fti_attr, obj->ioo_bufcnt,
- rnb, nr_local, lnb);
+ rnb, nr_local, lnb, oti);
obdo_from_la(oa, &info->fti_attr, LA_ATIME);
}
} else {