* Copyright (c) 2009, 2010, Oracle and/or its affiliates. All rights reserved.
* Use is subject to license terms.
*
- * Copyright (c) 2012, 2014 Intel Corporation.
+ * Copyright (c) 2012, 2016, Intel Corporation.
*/
/*
* This file is part of Lustre, http://www.lustre.org/
#define DEBUG_SUBSYSTEM S_FILTER
+#include <linux/kthread.h>
#include "ofd_internal.h"
struct ofd_inconsistency_item {
OBD_ALLOC_PTR(lr);
if (unlikely(lr == NULL))
- GOTO(out, rc = -ENOMEM);
+ GOTO(out_unlocked, rc = -ENOMEM);
lr->lr_event = LE_PAIRS_VERIFY;
lr->lr_active = LFSCK_TYPE_LAYOUT;
GOTO(out, rc = 0);
+out_unlocked:
+ spin_lock(&ofd->ofd_inconsistency_lock);
out:
thread_set_flags(thread, SVC_STOPPED);
wake_up_all(&thread->t_ctl_waitq);
*nr_local = 0;
for (i = 0, j = 0; i < niocount; i++) {
rc = dt_bufs_get(env, ofd_object_child(fo), rnb + i,
- lnb + j, 0, ofd_object_capa(env, fo));
+ lnb + j, 0);
if (unlikely(rc < 0))
GOTO(buf_put, rc);
LASSERT(rc <= PTLRPC_MAX_BRW_PAGES);
}
LASSERT(*nr_local > 0 && *nr_local <= PTLRPC_MAX_BRW_PAGES);
- rc = dt_attr_get(env, ofd_object_child(fo), la,
- ofd_object_capa(env, fo));
+ rc = dt_attr_get(env, ofd_object_child(fo), la);
if (unlikely(rc))
GOTO(buf_put, rc);
oseq = ofd_seq_load(env, ofd, seq);
if (IS_ERR(oseq)) {
- CERROR("%s: Can't find FID Sequence "LPX64": rc = %d\n",
+ CERROR("%s: Can't find FID Sequence %#llx: rc = %d\n",
ofd_name(ofd), seq, (int)PTR_ERR(oseq));
GOTO(out, rc = -EINVAL);
}
*nr_local = 0;
for (i = 0, j = 0; i < obj->ioo_bufcnt; i++) {
rc = dt_bufs_get(env, ofd_object_child(fo),
- rnb + i, lnb + j, 1,
- ofd_object_capa(env, fo));
+ rnb + i, lnb + j, 1);
if (unlikely(rc < 0))
GOTO(err, rc);
LASSERT(rc <= PTLRPC_MAX_BRW_PAGES);
lnb[j+k].lnb_flags = rnb[i].rnb_flags;
if (!(rnb[i].rnb_flags & OBD_BRW_GRANTED))
lnb[j+k].lnb_rc = -ENOSPC;
-
- /* remote client can't break through quota */
- if (exp_connect_rmtclient(exp))
- lnb[j+k].lnb_flags &= ~OBD_BRW_NOQUOTA;
}
j += rc;
*nr_local += rc;
err:
dt_bufs_put(env, ofd_object_child(fo), lnb, *nr_local);
ofd_read_unlock(env, fo);
+ ofd_object_put(env, fo);
/* ofd_grant_prepare_write() was called, so we must commit */
- ofd_grant_commit(env, exp, rc);
+ ofd_grant_commit(exp, oa->o_grant_used, rc);
out:
/* let's still process incoming grant information packed in the oa,
* but without enforcing grant since we won't proceed with the write.
* \param[in] rnb remote buffers
* \param[in] nr_local number of local buffers
* \param[in] lnb local buffers
- * \param[in] oti request data from OST
- * \param[in] capa capability
*
* \retval 0 on successful prepare
* \retval negative value on error
int ofd_preprw(const struct lu_env *env, int cmd, struct obd_export *exp,
struct obdo *oa, int objcount, struct obd_ioobj *obj,
struct niobuf_remote *rnb, int *nr_local,
- struct niobuf_local *lnb, struct obd_trans_info *oti,
- struct lustre_capa *capa)
+ struct niobuf_local *lnb)
{
struct tgt_session_info *tsi = tgt_ses_info(env);
struct ofd_device *ofd = ofd_exp(exp);
}
if (tgt_ses_req(tsi) == NULL) { /* echo client case */
- LASSERT(oti != NULL);
info = ofd_info_init(env, exp);
- ofd_oti2info(info, oti);
jobid = NULL;
} else {
info = tsi2ofd_info(tsi);
LASSERT(oa != NULL);
- if (OBD_FAIL_CHECK(OBD_FAIL_OST_ENOENT)) {
+ if (OBD_FAIL_CHECK(OBD_FAIL_SRV_ENOENT)) {
struct ofd_seq *oseq;
oseq = ofd_seq_load(env, ofd, ostid_seq(&oa->o_oi));
LASSERT(obj->ioo_bufcnt > 0);
if (cmd == OBD_BRW_WRITE) {
- rc = ofd_auth_capa(exp, fid, ostid_seq(&oa->o_oi),
- capa, CAPA_OPC_OSS_WRITE);
- if (rc == 0) {
- la_from_obdo(&info->fti_attr, oa, OBD_MD_FLGETATTR);
- rc = ofd_preprw_write(env, exp, ofd, fid,
- &info->fti_attr, oa, objcount,
- obj, rnb, nr_local, lnb, jobid);
- }
+ la_from_obdo(&info->fti_attr, oa, OBD_MD_FLGETATTR);
+ rc = ofd_preprw_write(env, exp, ofd, fid, &info->fti_attr, oa,
+ objcount, obj, rnb, nr_local, lnb, jobid);
} else if (cmd == OBD_BRW_READ) {
- rc = ofd_auth_capa(exp, fid, ostid_seq(&oa->o_oi),
- capa, CAPA_OPC_OSS_READ);
- if (rc == 0) {
- ofd_grant_prepare_read(env, exp, oa);
- rc = ofd_preprw_read(env, exp, ofd, fid,
- &info->fti_attr, oa,
- obj->ioo_bufcnt, rnb, nr_local,
- lnb, jobid);
- obdo_from_la(oa, &info->fti_attr, LA_ATIME);
- }
+ ofd_grant_prepare_read(env, exp, oa);
+ rc = ofd_preprw_read(env, exp, ofd, fid, &info->fti_attr, oa,
+ obj->ioo_bufcnt, rnb, nr_local, lnb,
+ jobid);
+ obdo_from_la(oa, &info->fti_attr, LA_ATIME);
} else {
CERROR("%s: wrong cmd %d received!\n",
exp->exp_obd->obd_name, cmd);
/* set uid/gid */
if (la->la_valid) {
- rc = dt_attr_set(env, dt_obj, la, th,
- ofd_object_capa(env, ofd_obj));
+ rc = dt_attr_set(env, dt_obj, la, th);
if (rc)
GOTO(out_tx, rc);
}
GOTO(out_tx, rc);
rc = dt_xattr_set(env, dt_obj, &info->fti_buf, XATTR_NAME_FID,
- 0, th, BYPASS_CAPA);
+ 0, th);
if (rc == 0) {
ofd_obj->ofo_pfid.f_seq = le64_to_cpu(ff->ff_parent.f_seq);
ofd_obj->ofo_pfid.f_oid = le32_to_cpu(ff->ff_parent.f_oid);
* \param[in] objcount always 1
* \param[in] niocount number of local buffers
* \param[in] lnb local buffers
+ * \param[in] granted grant space consumed for the bulk I/O
* \param[in] old_rc result of processing at this point
*
* \retval 0 on successful commit
ofd_commitrw_write(const struct lu_env *env, struct obd_export *exp,
struct ofd_device *ofd, const struct lu_fid *fid,
struct lu_attr *la, struct filter_fid *ff, int objcount,
- int niocount, struct niobuf_local *lnb, int old_rc)
+ int niocount, struct niobuf_local *lnb,
+ unsigned long granted, int old_rc)
{
- struct ofd_thread_info *info = ofd_info(env);
- struct ofd_object *fo;
- struct dt_object *o;
- struct thandle *th;
- int rc = 0;
- int retries = 0;
- int i;
struct filter_export_data *fed = &exp->exp_filter_data;
- bool soft_sync = false;
- bool cb_registered = false;
+ struct ofd_object *fo;
+ struct dt_object *o;
+ struct thandle *th;
+ int rc = 0;
+ int rc2 = 0;
+ int retries = 0;
+ int i;
+ bool soft_sync = false;
+ bool cb_registered = false;
+ bool fake_write = false;
ENTRY;
la->la_valid &= LA_ATIME | LA_MTIME | LA_CTIME;
+ /* do fake write, to simulate the write case for performance testing */
+ if (OBD_FAIL_CHECK(OBD_FAIL_OST_FAKE_RW)) {
+ struct niobuf_local *last = &lnb[niocount - 1];
+ __u64 file_size = last->lnb_file_offset + last->lnb_len;
+ __u64 valid = la->la_valid;
+
+ la->la_valid = LA_SIZE;
+ la->la_size = 0;
+ rc = dt_attr_get(env, o, la);
+ if (rc < 0 && rc != -ENOENT)
+ GOTO(out, rc);
+
+ if (file_size < la->la_size)
+ file_size = la->la_size;
+
+ /* dirty inode by setting file size */
+ la->la_valid = valid | LA_SIZE;
+ la->la_size = file_size;
+
+ fake_write = true;
+ }
+
retry:
th = ofd_trans_create(env, ofd);
if (IS_ERR(th))
if (OBD_FAIL_CHECK(OBD_FAIL_OST_DQACQ_NET))
GOTO(out_stop, rc = -EINPROGRESS);
- rc = dt_declare_write_commit(env, o, lnb, niocount, th);
- if (rc)
- GOTO(out_stop, rc);
+ if (likely(!fake_write)) {
+ rc = dt_declare_write_commit(env, o, lnb, niocount, th);
+ if (rc)
+ GOTO(out_stop, rc);
+ }
if (la->la_valid) {
/* update [mac]time if needed */
if (rc)
GOTO(out_stop, rc);
- rc = dt_write_commit(env, o, lnb, niocount, th);
- if (rc)
- GOTO(out_stop, rc);
+ if (likely(!fake_write)) {
+ rc = dt_write_commit(env, o, lnb, niocount, th);
+ if (rc)
+ GOTO(out_stop, rc);
+ }
if (la->la_valid) {
- rc = dt_attr_set(env, o, la, th, ofd_object_capa(env, fo));
+ rc = dt_attr_set(env, o, la, th);
if (rc)
GOTO(out_stop, rc);
}
/* get attr to return */
- rc = dt_attr_get(env, o, la, ofd_object_capa(env, fo));
+ rc = dt_attr_get(env, o, la);
out_stop:
/* Force commit to make the just-deleted blocks
cb_registered = true;
}
- ofd_trans_stop(env, ofd, th, rc);
+ if (rc == 0 && granted > 0) {
+ if (ofd_grant_commit_cb_add(th, exp, granted) == 0)
+ granted = 0;
+ }
+
+ rc2 = ofd_trans_stop(env, ofd, th, rc);
+ if (!rc)
+ rc = rc2;
if (rc == -ENOSPC && retries++ < 3) {
CDEBUG(D_INODE, "retry after force commit, retries:%d\n",
retries);
ofd_object_put(env, fo);
/* second put is pair to object_get in ofd_preprw_write */
ofd_object_put(env, fo);
- ofd_grant_commit(env, info->fti_exp, old_rc);
+ if (granted > 0)
+ ofd_grant_commit(exp, granted, old_rc);
RETURN(rc);
}
* \param[in] rnb remote buffers
* \param[in] npages number of local buffers
* \param[in] lnb local buffers
- * \param[in] oti request data from OST
* \param[in] old_rc result of processing at this point
*
* \retval 0 on successful commit
int ofd_commitrw(const struct lu_env *env, int cmd, struct obd_export *exp,
struct obdo *oa, int objcount, struct obd_ioobj *obj,
struct niobuf_remote *rnb, int npages,
- struct niobuf_local *lnb, struct obd_trans_info *oti,
- int old_rc)
+ struct niobuf_local *lnb, int old_rc)
{
struct ofd_thread_info *info = ofd_info(env);
struct ofd_mod_data *fmd;
}
rc = ofd_commitrw_write(env, exp, ofd, fid, &info->fti_attr,
- ff, objcount, npages, lnb, old_rc);
+ ff, objcount, npages, lnb,
+ oa->o_grant_used, old_rc);
if (rc == 0)
obdo_from_la(oa, &info->fti_attr,
OFD_VALID_FLAGS | LA_GID | LA_UID);
rc = -EPROTO;
}
- if (oti != NULL)
- ofd_info2oti(info, oti);
RETURN(rc);
}