-/*
- * GPL HEADER START
- *
- * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
- *
- * This program is free software; you can redistribute it and/or modify
- * it under the terms of the GNU General Public License version 2 only,
- * as published by the Free Software Foundation.
- *
- * This program is distributed in the hope that it will be useful, but
- * WITHOUT ANY WARRANTY; without even the implied warranty of
- * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
- * General Public License version 2 for more details (a copy is included
- * in the LICENSE file that accompanied this code).
- *
- * You should have received a copy of the GNU General Public License
- * version 2 along with this program; If not, see
- * http://www.gnu.org/licenses/gpl-2.0.html
- *
- * GPL HEADER END
- */
+// SPDX-License-Identifier: GPL-2.0
+
/*
* Copyright (c) 2009, 2010, Oracle and/or its affiliates. All rights reserved.
* Use is subject to license terms.
*
* Copyright (c) 2012, 2017, Intel Corporation.
*/
+
/*
* This file is part of Lustre, http://www.lustre.org/
- * Lustre is a trademark of Sun Microsystems, Inc.
- *
- * lustre/ofd/ofd_dev.c
*
* This file contains OSD API methods for OBD Filter Device (OFD),
* request handlers and supplemental functions to set OFD up and clean it up.
* Author: Mike Pershin <mike.pershin@intel.com>
* Author: Johann Lombardi <johann.lombardi@intel.com>
*/
+
/*
* The OBD Filter Device (OFD) module belongs to the Object Storage
* Server stack and connects the RPC oriented Unified Target (TGT)
#include <lustre_quota.h>
#include <lustre_nodemap.h>
#include <lustre_log.h>
+#include <llog_swab.h>
+#include <lustre_swab.h>
#include <linux/falloc.h>
#include "ofd_internal.h"
m->ofd_dt_dev.dd_lu_dev.ld_site->ls_top_dev = &m->ofd_dt_dev.dd_lu_dev;
out:
- if (data)
- OBD_FREE_PTR(data);
+ OBD_FREE_PTR(data);
RETURN(rc);
}
*/
static int ofd_stack_init(const struct lu_env *env,
struct ofd_device *m, struct lustre_cfg *cfg,
- u32 *lmd_flags)
+ unsigned long *lmd_flags)
{
const char *dev = lustre_cfg_string(cfg, 0);
struct lu_device *d;
lmd = s2lsi(lmi->lmi_sb)->lsi_lmd;
if (lmd) {
- if (lmd->lmd_flags & LMD_FLG_SKIP_LFSCK)
+ if (test_bit(LMD_FLG_SKIP_LFSCK, lmd->lmd_flags))
m->ofd_skip_lfsck = 1;
- if (lmd->lmd_flags & LMD_FLG_NO_PRECREATE)
- m->ofd_no_precreate = 1;
- *lmd_flags = lmd->lmd_flags;
+ if (test_bit(LMD_FLG_NO_CREATE, lmd->lmd_flags))
+ m->ofd_lut.lut_no_create = 1;
+ bitmap_copy(lmd_flags, lmd->lmd_flags, LMD_FLG_NUM_FLAGS);
}
/* find bottom osd */
d = m->ofd_osd_exp->exp_obd->obd_lu_dev;
LASSERT(d);
m->ofd_osd = lu2dt_dev(d);
+ if (m->ofd_osd->dd_rdonly)
+ ofd_obd(m)->obd_read_only = 1;
snprintf(info->fti_u.name, sizeof(info->fti_u.name),
"%s-osd", lustre_cfg_string(cfg, 0));
}
lu_site_purge(env, top->ld_site, ~0);
- if (!cfs_hash_is_empty(top->ld_site->ls_obj_hash)) {
- LIBCFS_DEBUG_MSG_DATA_DECL(msgdata, D_OTHER, NULL);
- lu_site_print(env, top->ld_site, &msgdata, lu_cdebug_printer);
- }
-
+ lu_site_print(env, top->ld_site, &top->ld_site->ls_obj_hash.nelems,
+ D_OTHER, lu_cdebug_printer);
LASSERT(m->ofd_osd_exp);
obd_disconnect(m->ofd_osd_exp);
return (*p)(env, cookie, LUSTRE_OST_NAME"-object@%p", o);
}
-static struct lu_object_operations ofd_obj_ops = {
+static const struct lu_object_operations ofd_obj_ops = {
.loo_object_init = ofd_object_init,
.loo_object_free = ofd_object_free,
.loo_object_print = ofd_object_print
LASSERTF(rc == 0, "register namespace failed: rc = %d\n", rc);
target_recovery_init(&ofd->ofd_lut, tgt_request_handle);
- OBD_FAIL_TIMEOUT_ORSET(OBD_FAIL_OST_PREPARE_DELAY, OBD_FAIL_ONCE,
+ CFS_FAIL_TIMEOUT_ORSET(OBD_FAIL_OST_PREPARE_DELAY, CFS_FAIL_ONCE,
(OBD_TIMEOUT_DEFAULT + 1) / 4);
LASSERT(obd->obd_no_conn);
spin_lock(&obd->obd_dev_lock);
obd->obd_no_conn = 0;
spin_unlock(&obd->obd_dev_lock);
- if (obd->obd_recovering == 0)
+ if (!test_bit(OBDF_RECOVERING, obd->obd_flags))
ofd_postrecov(env, ofd);
RETURN(rc);
/**
* lu_device_operations matrix for OFD device.
*/
-static struct lu_device_operations ofd_lu_ops = {
+static const struct lu_device_operations ofd_lu_ops = {
.ldo_object_alloc = ofd_object_alloc,
.ldo_process_config = ofd_process_config,
.ldo_recovery_complete = ofd_recovery_complete,
.ldo_prepare = ofd_prepare,
};
-#if LUSTRE_VERSION_CODE < OBD_OCD_VERSION(2, 14, 53, 0)
-/**
- * Expose OSD statistics to OFD layer.
- *
- * The osd interfaces to the backend file system exposes useful data
- * such as brw_stats and read or write cache states. This same data
- * needs to be exposed into the obdfilter (ofd) layer to maintain
- * backwards compatibility. This function creates the symlinks in the
- * proc layer to enable this.
- *
- * \param[in] ofd OFD device
- */
-static void ofd_procfs_add_brw_stats_symlink(struct ofd_device *ofd)
-{
- struct obd_device *obd = ofd_obd(ofd);
- struct obd_device *osd_obd = ofd->ofd_osd_exp->exp_obd;
- struct kobj_type *osd_type;
- int i;
-
- osd_type = get_ktype(&ofd->ofd_osd->dd_kobj);
- for (i = 0; osd_type->default_attrs[i]; i++) {
- if (strcmp(osd_type->default_attrs[i]->name,
- "read_cache_enable") == 0) {
- ofd->ofd_read_cache_enable =
- osd_type->default_attrs[i];
- }
-
- if (strcmp(osd_type->default_attrs[i]->name,
- "readcache_max_filesize") == 0) {
- ofd->ofd_read_cache_max_filesize =
- osd_type->default_attrs[i];
- }
-
- if (strcmp(osd_type->default_attrs[i]->name,
- "writethrough_cache_enable") == 0) {
- ofd->ofd_write_cache_enable =
- osd_type->default_attrs[i];
- }
- }
-
- if (obd->obd_proc_entry == NULL)
- return;
-
- lprocfs_add_symlink("brw_stats", obd->obd_proc_entry,
- "../../%s/%s/brw_stats",
- osd_obd->obd_type->typ_name, obd->obd_name);
-}
-#endif
-
/**
* Cleanup all procfs entries in OFD.
*
tgt_tunables_fini(&ofd->ofd_lut);
lprocfs_free_per_client_stats(obd);
lprocfs_obd_cleanup(obd);
- lprocfs_free_obd_stats(obd);
+ ldebugfs_free_obd_stats(obd);
lprocfs_job_stats_fini(obd);
}
GOTO(out_name, rc = -ENOMEM);
rc = seq_server_init(env, ss->ss_server_seq, ofd->ofd_osd, obd_name,
- LUSTRE_SEQ_SERVER, ss);
+ LUSTRE_SEQ_SERVER, ss, false);
if (rc) {
CERROR("%s: seq server init error: rc = %d\n", obd_name, rc);
GOTO(out_server, rc);
void *key, *val = NULL;
int keylen, vallen, rc = 0;
bool is_grant_shrink;
+ ktime_t kstart = ktime_get();
ENTRY;
if (is_grant_shrink) {
body = req_capsule_client_get(tsi->tsi_pill, &RMF_OST_BODY);
+ /*
+ * Because we already sync grant info with client when
+ * reconnect, grant info will be cleared for resent
+ * req, otherwise, outdated grant count in the rpc
+ * would de-sync grant counters
+ */
+ if (lustre_msg_get_flags(req->rq_reqmsg) &
+ (MSG_RESENT | MSG_REPLAY)) {
+ DEBUG_REQ(D_CACHE, req,
+ "clear resent/replay req grant info");
+ body->oa.o_valid &= ~OBD_MD_FLGRANT;
+ }
+
repbody = req_capsule_server_get(tsi->tsi_pill, &RMF_OST_BODY);
*repbody = *body;
rc = -EOPNOTSUPP;
}
ofd_counter_incr(tsi->tsi_exp, LPROC_OFD_STATS_SET_INFO,
- tsi->tsi_jobid, 1);
+ tsi->tsi_jobid, ktime_us_delta(ktime_get(), kstart));
RETURN(rc);
}
return rc;
CDEBUG(D_OTHER, "ost lock [%llu,%llu], lh=%p\n", begin, end, &lh);
- tgt_extent_unlock(&lh, LCK_PR);
+ tgt_data_unlock(&lh, LCK_PR);
return 0;
}
RETURN(rc);
}
+/**
+ * ofd_fid2path() - load parent FID.
+ * @info: Per-thread common data shared by ost level handlers.
+ * @fp: User-provided struct for arguments and to store MDT-FID information.
+ *
+ * Part of the OST layer implementation of lfs fid2path.
+ *
+ * Return: 0 Lookup successful,
+ * negative errno if there was a problem
+ */
+static int ofd_fid2path(struct ofd_thread_info *info,
+ struct getinfo_fid2path *fp)
+{
+ struct ofd_device *ofd = ofd_exp(info->fti_exp);
+ struct ofd_object *fo = NULL;
+ int rc;
+
+ ENTRY;
+
+ if (!fid_is_sane(&fp->gf_fid))
+ RETURN(-EINVAL);
+
+ if (!fid_is_namespace_visible(&fp->gf_fid)) {
+ CDEBUG(D_IOCTL,
+ "%s: "DFID" is invalid, f_seq should be >= %#llx, or f_oid != 0, or f_ver == 0\n",
+ ofd_name(ofd), PFID(&fp->gf_fid),
+ (__u64)FID_SEQ_NORMAL);
+ RETURN(-EINVAL);
+ }
+
+ fo = ofd_object_find(info->fti_env, ofd, &fp->gf_fid);
+ if (IS_ERR_OR_NULL(fo)) {
+ rc = IS_ERR(fo) ? PTR_ERR(fo) : -ENOENT;
+ CDEBUG(D_IOCTL, "%s: cannot find "DFID": rc=%d\n",
+ ofd_name(ofd), PFID(&fp->gf_fid), rc);
+ RETURN(rc);
+ }
+ if (!ofd_object_exists(fo))
+ GOTO(out, rc = -ENOENT);
+
+ rc = ofd_object_ff_load(info->fti_env, fo, false);
+ if (rc) {
+ CDEBUG(D_IOCTL, "%s: ff_load failed for "DFID": rc=%d\n",
+ ofd_name(ofd), PFID(&fp->gf_fid), rc);
+ GOTO(out, rc);
+ }
+
+ fp->gf_fid = fo->ofo_ff.ff_parent;
+ fp->gf_fid.f_ver = 0;
+
+out:
+ if (fo)
+ ofd_object_put(info->fti_env, fo);
+
+ RETURN(rc);
+}
+
+static int ofd_rpc_fid2path(struct tgt_session_info *tsi,
+ struct ofd_thread_info *info,
+ void *key, int keylen,
+ void *val, int vallen)
+{
+ struct getinfo_fid2path *fpout, *fpin;
+ int rc = 0;
+
+ fpin = key + round_up(sizeof(KEY_FID2PATH), 8);
+ fpout = val;
+
+ if (req_capsule_req_need_swab(tsi->tsi_pill))
+ lustre_swab_fid2path(fpin);
+
+ memcpy(fpout, fpin, sizeof(*fpin));
+
+ rc = ofd_fid2path(info, fpout);
+ RETURN(rc);
+}
/**
* OFD request handler for OST_GET_INFO RPC.
* - KEY_LAST_ID (obsolete)
* - KEY_FIEMAP
* - KEY_LAST_FID
+ * - KEY_FID2PATH
*
* This function reads needed data from storage and fills reply with it.
*
void *key;
int keylen;
int replylen, rc = 0;
+ ktime_t kstart = ktime_get();
ENTRY;
oseq = ofd_seq_load(tsi->tsi_env, ofd,
ostid_seq(&fti->fti_ostid));
if (IS_ERR(oseq))
- RETURN(PTR_ERR(oseq));
+ RETURN(-EFAULT);
rc = ostid_to_fid(fid, &oseq->os_oi,
ofd->ofd_lut.lut_lsd.lsd_osd_index);
if (rc != 0)
- GOTO(out_put, rc);
+ GOTO(out_put, rc = -EFAULT);
CDEBUG(D_HA, "%s: LAST FID is "DFID"\n", ofd_name(ofd),
PFID(fid));
out_put:
ofd_seq_put(tsi->tsi_env, oseq);
+ } else if (KEY_IS(KEY_FID2PATH)) {
+ __u32 *vallen;
+ void *valout;
+
+ req_capsule_extend(tsi->tsi_pill, &RQF_MDS_FID2PATH);
+ vallen = req_capsule_client_get(tsi->tsi_pill,
+ &RMF_GETINFO_VALLEN);
+ if (!vallen) {
+ CDEBUG(D_IOCTL,
+ "%s: cannot get RMF_GETINFO_VALLEN buffer\n",
+ tgt_name(tsi->tsi_tgt));
+ RETURN(err_serious(-EPROTO));
+ }
+
+ req_capsule_set_size(tsi->tsi_pill, &RMF_GETINFO_VAL,
+ RCL_SERVER, *vallen);
+ rc = req_capsule_server_pack(tsi->tsi_pill);
+ if (rc)
+ RETURN(err_serious(rc));
+
+ valout = req_capsule_server_get(tsi->tsi_pill,
+ &RMF_GETINFO_VAL);
+ if (!valout) {
+ CDEBUG(D_IOCTL,
+ "%s: cannot get get-info RPC out buffer\n",
+ tgt_name(tsi->tsi_tgt));
+ RETURN(-ENOMEM);
+ }
+ rc = ofd_rpc_fid2path(tsi, fti, key, keylen, valout, *vallen);
} else {
CERROR("%s: not supported key %s\n", tgt_name(tsi->tsi_tgt),
(char *)key);
rc = -EOPNOTSUPP;
}
ofd_counter_incr(tsi->tsi_exp, LPROC_OFD_STATS_GET_INFO,
- tsi->tsi_jobid, 1);
+ tsi->tsi_jobid, ktime_us_delta(ktime_get(), kstart));
RETURN(rc);
}
struct ofd_object *fo;
__u64 flags = 0;
enum ldlm_mode lock_mode = LCK_PR;
+ ktime_t kstart = ktime_get();
bool srvlock;
int rc;
ENTRY;
repbody->oa.o_layout_version =
fo->ofo_ff.ff_layout_version + fo->ofo_ff.ff_range;
- CDEBUG(D_INODE, DFID": get layout version: %u\n",
+ CDEBUG(D_INODE, "%s:"DFID": get layout version: %#x\n",
+ tsi->tsi_tgt->lut_obd->obd_name,
PFID(&tsi->tsi_fid),
repbody->oa.o_layout_version);
}
ofd_object_put(tsi->tsi_env, fo);
out:
if (srvlock)
- tgt_extent_unlock(&lh, lock_mode);
+ tgt_data_unlock(&lh, lock_mode);
ofd_counter_incr(tsi->tsi_exp, LPROC_OFD_STATS_GETATTR,
- tsi->tsi_jobid, 1);
+ tsi->tsi_jobid, ktime_us_delta(ktime_get(), kstart));
repbody->oa.o_valid |= OBD_MD_FLFLAGS;
repbody->oa.o_flags = OBD_FL_FLUSH;
struct ost_body *repbody;
struct ldlm_resource *res;
struct ofd_object *fo;
+ ktime_t kstart = ktime_get();
int rc = 0;
ENTRY;
OFD_VALID_FLAGS | LA_UID | LA_GID | LA_PROJID);
ofd_counter_incr(tsi->tsi_exp, LPROC_OFD_STATS_SETATTR,
- tsi->tsi_jobid, 1);
+ tsi->tsi_jobid, ktime_us_delta(ktime_get(), kstart));
EXIT;
out_put:
ofd_object_put(tsi->tsi_env, fo);
* otherwise concurrent destroy can make the object unavailable
* for 2nd lu_object_find() waiting for the first reference
* to go... deadlock! */
- res = ldlm_resource_get(ofd->ofd_namespace, NULL,
- &tsi->tsi_resid, LDLM_EXTENT, 0);
+ res = ldlm_resource_get(ofd->ofd_namespace, &tsi->tsi_resid,
+ LDLM_EXTENT, 0);
if (!IS_ERR(res)) {
ldlm_res_lvbo_update(res, NULL, 0);
ldlm_resource_putref(res);
u64 oid;
int skip_orphan;
int rc = 0;
+ char *target_start;
+ int target_len;
ENTRY;
LASSERT(exp != NULL);
skip_orphan = !!(exp_connect_flags(exp) & OBD_CONNECT_SKIP_ORPHAN);
- if (OBD_FAIL_CHECK(OBD_FAIL_OST_NODESTROY))
+ if (CFS_FAIL_CHECK(OBD_FAIL_OST_NODESTROY))
goto done;
- LCONSOLE(D_INFO, "%s: deleting orphan objects from "DOSTID
- " to "DOSTID"\n", ofd_name(ofd), seq, end_id + 1, seq, last);
+ deuuidify(exp->exp_client_uuid.uuid, NULL, &target_start, &target_len);
+ LCONSOLE(D_INFO, "%s: new connection from %.*s (cleaning up unused objects from "DOSTID" to "DOSTID")\n",
+ ofd_name(ofd), target_len, target_start, seq, end_id + 1, seq,
+ last);
while (oid > end_id) {
rc = fid_set_id(fid, oid);
*/
static int ofd_create_hdl(struct tgt_session_info *tsi)
{
- struct ptlrpc_request *req = tgt_ses_req(tsi);
- struct ost_body *repbody;
- const struct obdo *oa = &tsi->tsi_ost_body->oa;
- struct obdo *rep_oa;
- struct obd_export *exp = tsi->tsi_exp;
- struct ofd_device *ofd = ofd_exp(exp);
- u64 seq = ostid_seq(&oa->o_oi);
- u64 oid = ostid_id(&oa->o_oi);
- struct ofd_seq *oseq;
+ struct ptlrpc_request *req = tgt_ses_req(tsi);
+ struct ost_body *repbody;
+ const struct obdo *oa = &tsi->tsi_ost_body->oa;
+ struct obdo *rep_oa;
+ struct obd_export *exp = tsi->tsi_exp;
+ struct ofd_device *ofd = ofd_exp(exp);
+ struct seq_server_site *ss = &ofd->ofd_seq_site;
+ __u64 seq_width = ss->ss_client_seq->lcs_width;
+ u64 seq = ostid_seq(&oa->o_oi);
+ u64 oid = ostid_id(&oa->o_oi);
+ struct ofd_seq *oseq;
+ int sync_trans = 0;
+ long granted = 0;
+ ktime_t kstart = ktime_get();
s64 diff;
int rc = 0;
- int sync_trans = 0;
- long granted = 0;
ENTRY;
- if (OBD_FAIL_CHECK(OBD_FAIL_OST_EROFS))
+ if (CFS_FAIL_CHECK(OBD_FAIL_OST_EROFS))
RETURN(-EROFS);
- if (ofd->ofd_no_precreate)
+ if (ofd->ofd_lut.lut_no_create)
return -EPERM;
repbody = req_capsule_server_get(tsi->tsi_pill, &RMF_OST_BODY);
rep_oa = &repbody->oa;
rep_oa->o_oi = oa->o_oi;
+ rep_oa->o_valid |= OBD_MD_FLSIZE;
+ rep_oa->o_size = seq_width;
LASSERT(oa->o_valid & OBD_MD_FLGROUP);
if ((oa->o_valid & OBD_MD_FLFLAGS) &&
(oa->o_flags & OBD_FL_RECREATE_OBJS)) {
- if (!ofd_obd(ofd)->obd_recovering ||
+ if (!test_bit(OBDF_RECOVERING, ofd_obd(ofd)->obd_flags) ||
oid > ofd_seq_last_oid(oseq)) {
CERROR("%s: recreate objid "DOSTID" > last id %llu"
"\n", ofd_name(ofd), POSTID(&oa->o_oi),
(oa->o_flags & OBD_FL_DELORPHAN)) {
exp->exp_filter_data.fed_lastid_gen = ofd->ofd_lastid_gen;
+ CFS_FAIL_TIMEOUT(OBD_FAIL_OST_DELORPHAN_DELAY, cfs_fail_val);
/* destroy orphans */
if (lustre_msg_get_conn_cnt(tgt_ses_req(tsi)->rq_reqmsg) <
exp->exp_conn_cnt) {
CERROR("%s: dropping old orphan cleanup request\n",
ofd_name(ofd));
- GOTO(out_nolock, rc = 0);
+ GOTO(out_nolock, rc = -ESTALE);
}
/* This causes inflight precreates to abort and drop lock */
oseq->os_destroys_in_progress = 1;
exp->exp_conn_cnt) {
CERROR("%s: dropping old precreate request\n",
ofd_name(ofd));
- GOTO(out, rc = 0);
+ GOTO(out, rc = -ESTALE);
}
/* only precreate if seq is 0, IDIF or normal and also o_id
* must be specfied */
} else {
diff = oid - ofd_seq_last_oid(oseq);
/* Do sync create if the seq is about to used up */
- if (fid_seq_is_idif(seq) || fid_seq_is_mdt0(seq)) {
- if (unlikely(oid >= IDIF_MAX_OID - 1))
- sync_trans = 1;
- } else if (fid_seq_is_norm(seq)) {
- if (unlikely(oid >=
- LUSTRE_DATA_SEQ_MAX_WIDTH - 1))
- sync_trans = 1;
- } else {
- CERROR("%s : invalid o_seq "DOSTID"\n",
- ofd_name(ofd), POSTID(&oa->o_oi));
- GOTO(out, rc = -EINVAL);
- }
+ sync_trans = ofd_seq_is_exhausted(ofd, oa);
+ if (sync_trans < 0)
+ GOTO(out, rc = sync_trans);
if (diff < 0) {
- /* LU-5648 */
- CERROR("%s: invalid precreate request for "
- DOSTID", last_id %llu. "
- "Likely MDS last_id corruption\n",
- ofd_name(ofd), POSTID(&oa->o_oi),
- ofd_seq_last_oid(oseq));
- GOTO(out, rc = -EINVAL);
+ LCONSOLE(D_INFO,
+ "%s: MDS LAST_ID "DFID" (%llu) is %lld behind OST LAST_ID "DFID" (%llu), trust the OST\n",
+ ofd_name(ofd), PFID(&oa->o_oi.oi_fid),
+ oid, -diff, PFID(&oseq->os_oi.oi_fid),
+ ofd_seq_last_oid(oseq));
+ /* Let MDS know that we are so far ahead. */
+ rc = ostid_set_id(&rep_oa->o_oi,
+ ofd_seq_last_oid(oseq) + 1);
}
}
}
if (diff > 0) {
time64_t enough_time = ktime_get_seconds() + DISK_TIMEOUT;
+ bool trans_local;
u64 next_id;
int created = 0;
int count;
int rc2;
+ /* This can happen if a new OST is formatted and installed
+ * in place of an old one at the same index. Instead of
+ * precreating potentially millions of deleted old objects
+ * (possibly filling the OST), only precreate the last batch.
+ * LFSCK will eventually clean up any orphans. LU-14 */
+ if (diff > 5 * OST_MAX_PRECREATE) {
+ /* Message below is checked in conf-sanity test_122b */
+ LCONSOLE_WARN("%s: precreate FID "DOSTID" is over %lld higher than LAST_ID "DOSTID", only precreating the last %llu objects. OST replaced or reformatted?\n",
+ ofd_name(ofd), POSTID(&oa->o_oi), diff,
+ POSTID(&oseq->os_oi),
+ min(seq_width, (__u64)OST_MAX_PRECREATE));
+ /* From last created */
+ diff = min(seq_width, (__u64)OST_MAX_PRECREATE);
+ ofd_seq_last_oid_set(oseq, ostid_id(&oa->o_oi) - diff);
+ /* no sync_trans when recreating last batch */
+ sync_trans = 0;
+ }
+
if (!(oa->o_valid & OBD_MD_FLFLAGS) ||
!(oa->o_flags & OBD_FL_DELORPHAN)) {
/* don't enforce grant during orphan recovery */
}
}
- /* This can happen if a new OST is formatted and installed
- * in place of an old one at the same index. Instead of
- * precreating potentially millions of deleted old objects
- * (possibly filling the OST), only precreate the last batch.
- * LFSCK will eventually clean up any orphans. LU-14 */
- if (diff > 5 * OST_MAX_PRECREATE) {
- diff = OST_MAX_PRECREATE / 2;
- LCONSOLE_WARN("%s: Too many FIDs to precreate "
- "OST replaced or reformatted: "
- "LFSCK will clean up",
- ofd_name(ofd));
-
- CDEBUG(D_HA, "%s: precreate FID "DOSTID" is over "
- "%u larger than the LAST_ID "DOSTID", only "
- "precreating the last %lld objects.\n",
- ofd_name(ofd), POSTID(&oa->o_oi),
- 5 * OST_MAX_PRECREATE,
- POSTID(&oseq->os_oi), diff);
- ofd_seq_last_oid_set(oseq, ostid_id(&oa->o_oi) - diff);
- }
-
+ trans_local = !exp_connect_replay_create(req->rq_export);
while (diff > 0) {
next_id = ofd_seq_last_oid(oseq) + 1;
count = ofd_precreate_batch(ofd, (int)diff);
}
rc = ofd_precreate_objects(tsi->tsi_env, ofd, next_id,
- oseq, count, sync_trans);
+ oseq, count, sync_trans,
+ trans_local);
if (rc > 0) {
created += rc;
diff -= rc;
}
EXIT;
ofd_counter_incr(exp, LPROC_OFD_STATS_CREATE,
- tsi->tsi_jobid, 1);
+ tsi->tsi_jobid, ktime_us_delta(ktime_get(), kstart));
if (unlikely(!oseq->os_last_id_synced))
oseq->os_last_id_synced = 1;
out:
struct ofd_device *ofd = ofd_exp(tsi->tsi_exp);
struct ofd_thread_info *fti = tsi2ofd_info(tsi);
struct lu_fid *fid = &fti->fti_fid;
+ ktime_t kstart = ktime_get();
u64 oid;
u32 count;
int rc = 0;
ENTRY;
- if (OBD_FAIL_CHECK(OBD_FAIL_OST_EROFS))
+ if (CFS_FAIL_CHECK(OBD_FAIL_OST_EROFS))
RETURN(-EROFS);
/* This is old case for clients before Lustre 2.4 */
}
ofd_counter_incr(tsi->tsi_exp, LPROC_OFD_STATS_DESTROY,
- tsi->tsi_jobid, 1);
+ tsi->tsi_jobid, ktime_us_delta(ktime_get(), kstart));
GOTO(out, rc);
*/
static int ofd_statfs_hdl(struct tgt_session_info *tsi)
{
+ ktime_t kstart = ktime_get();
struct obd_statfs *osfs;
int rc;
ENTRY;
- OBD_FAIL_TIMEOUT(OBD_FAIL_OST_STATFS_DELAY, 10);
+ CFS_FAIL_TIMEOUT(OBD_FAIL_OST_STATFS_DELAY, 10);
osfs = req_capsule_server_get(tsi->tsi_pill, &RMF_OBD_STATFS);
CERROR("%s: statfs failed: rc = %d\n",
tgt_name(tsi->tsi_tgt), rc);
- if (OBD_FAIL_CHECK(OBD_FAIL_OST_STATFS_EINPROGRESS))
+ if (CFS_FAIL_CHECK(OBD_FAIL_OST_STATFS_EINPROGRESS))
rc = -EINPROGRESS;
ofd_counter_incr(tsi->tsi_exp, LPROC_OFD_STATS_STATFS,
- tsi->tsi_jobid, 1);
+ tsi->tsi_jobid, ktime_us_delta(ktime_get(), kstart));
RETURN(rc);
}
struct ofd_thread_info *fti = tsi2ofd_info(tsi);
struct ofd_device *ofd = ofd_exp(tsi->tsi_exp);
struct ofd_object *fo = NULL;
+ ktime_t kstart = ktime_get();
int rc = 0;
ENTRY;
GOTO(put, rc);
ofd_counter_incr(tsi->tsi_exp, LPROC_OFD_STATS_SYNC,
- tsi->tsi_jobid, 1);
+ tsi->tsi_jobid, ktime_us_delta(ktime_get(), kstart));
if (fo == NULL)
RETURN(0);
struct ldlm_resource *res;
struct ofd_object *fo;
__u64 flags = 0;
+ __u64 valid;
struct lustre_handle lh = { 0, };
int rc, mode;
__u64 start, end;
bool srvlock;
+ ktime_t kstart = ktime_get();
repbody = req_capsule_server_get(tsi->tsi_pill, &RMF_OST_BODY);
if (repbody == NULL)
RETURN(err_serious(-ENOMEM));
/*
- * fallocate start and end are passed in o_size, o_blocks
- * on the wire.
+ * fallocate() start and end are passed in o_size and o_blocks
+ * on the wire. Clients 2.15.0 and newer should always set
+ * the OBD_MD_FLSIZE and OBD_MD_FLBLOCKS valid flags, but some
+ * older client (exp_old_falloc is true) versions did not.
+ * We permit older clients to not set these flags, checking their
+ * version by proxy using the lack of OBD_CONNECT_TRUNCLOCK to
+ * imply 2.14.0 and older.
+ *
+ * Return -EOPNOTSUPP to also work with older clients not
+ * supporting newer server modes.
*/
+ if ((oa->o_valid & (OBD_MD_FLSIZE | OBD_MD_FLBLOCKS)) !=
+ (OBD_MD_FLSIZE | OBD_MD_FLBLOCKS)
+#if LUSTRE_VERSION_CODE < OBD_OCD_VERSION(2, 21, 53, 0)
+ && !tsi->tsi_exp->exp_old_falloc
+#endif
+ )
+ RETURN(-EOPNOTSUPP);
+
start = oa->o_size;
end = oa->o_blocks;
+ CDEBUG(D_INFO, "%s: start: %llu end: %llu\n", tgt_name(tsi->tsi_tgt),
+ start, end);
+
+#if LUSTRE_VERSION_CODE < OBD_OCD_VERSION(2, 21, 53, 0)
+ /* For inter-op case with older clients (where exp_old_falloc is true)
+ * fallocate() start and end are passed in as 0 (For interior case
+ * where end offset less than file size) This is fixed later.
+ * For such cases we return -EOPNOTSUPP
+ */
+ if (tsi->tsi_exp->exp_old_falloc && start >= end)
+ RETURN(-EOPNOTSUPP);
+#endif
+ /* client should already limit len >= 0 */
+ if (start >= end)
+ RETURN(-EINVAL);
+
mode = oa->o_falloc_mode;
/*
- * Only mode == 0 (which is standard prealloc) is supported now.
- * Punch is not supported yet.
+ * mode == 0 (which is standard prealloc) and PUNCH/ZERO are supported
+ * Rest of mode options are not supported yet.
*/
- if (mode & ~FALLOC_FL_KEEP_SIZE)
+ if (mode & ~(FALLOC_FL_KEEP_SIZE | FALLOC_FL_PUNCH_HOLE |
+ FALLOC_FL_ZERO_RANGE))
RETURN(-EOPNOTSUPP);
+ /* PUNCH_HOLE mode should always be accompanied with KEEP_SIZE flag
+ * Check that and add the missing flag for such invalid call with
+ * warning.
+ */
+ if (mode & FALLOC_FL_PUNCH_HOLE && !(mode & FALLOC_FL_KEEP_SIZE)) {
+ CDEBUG(D_INFO, "%s: PUNCH mode misses KEEP_SIZE flag, setting it\n",
+ tsi->tsi_tgt->lut_obd->obd_name);
+ mode |= FALLOC_FL_KEEP_SIZE;
+ }
+
repbody->oa.o_oi = oa->o_oi;
repbody->oa.o_valid = OBD_MD_FLID;
if (IS_ERR(fo))
GOTO(out, rc = PTR_ERR(fo));
- la_from_obdo(&info->fti_attr, oa,
- OBD_MD_FLMTIME | OBD_MD_FLATIME | OBD_MD_FLCTIME);
+ valid = OBD_MD_FLUID | OBD_MD_FLGID | OBD_MD_FLPROJID |
+ OBD_MD_FLATIME | OBD_MD_FLMTIME | OBD_MD_FLCTIME;
+ la_from_obdo(&info->fti_attr, oa, valid);
rc = ofd_object_fallocate(tsi->tsi_env, fo, start, end, mode,
&info->fti_attr, oa);
rc = ofd_attr_get(tsi->tsi_env, fo, &info->fti_attr);
if (rc == 0)
- obdo_from_la(&repbody->oa, &info->fti_attr,
- OFD_VALID_FLAGS);
+ obdo_from_la(&repbody->oa, &info->fti_attr, OFD_VALID_FLAGS);
else
rc = 0;
ofd_counter_incr(tsi->tsi_exp, LPROC_OFD_STATS_PREALLOC,
- tsi->tsi_jobid, 1);
+ tsi->tsi_jobid, ktime_us_delta(ktime_get(), kstart));
EXIT;
out_put:
ofd_object_put(tsi->tsi_env, fo);
out:
if (srvlock)
- tgt_extent_unlock(&lh, LCK_PW);
+ tgt_data_unlock(&lh, LCK_PW);
if (rc == 0) {
- res = ldlm_resource_get(ns, NULL, &tsi->tsi_resid,
- LDLM_EXTENT, 0);
+ res = ldlm_resource_get(ns, &tsi->tsi_resid, LDLM_EXTENT, 0);
if (!IS_ERR(res)) {
struct ost_lvb *res_lvb;
RETURN(rc);
}
-
/**
* OFD request handler for OST_PUNCH RPC.
*
struct ofd_object *fo;
__u64 flags = 0;
struct lustre_handle lh = { 0, };
- int rc;
__u64 start, end;
bool srvlock;
+ ktime_t kstart = ktime_get();
+ int rc;
ENTRY;
- OBD_FAIL_TIMEOUT(OBD_FAIL_OST_PAUSE_PUNCH, cfs_fail_val);
-
- /* check that we do support OBD_CONNECT_TRUNCLOCK. */
- BUILD_BUG_ON(!(OST_CONNECT_SUPPORTED & OBD_CONNECT_TRUNCLOCK));
+ CFS_FAIL_TIMEOUT(OBD_FAIL_OST_PAUSE_PUNCH, cfs_fail_val);
if ((oa->o_valid & (OBD_MD_FLSIZE | OBD_MD_FLBLOCKS)) !=
(OBD_MD_FLSIZE | OBD_MD_FLBLOCKS))
GOTO(out, rc = PTR_ERR(fo));
la_from_obdo(&info->fti_attr, oa,
- OBD_MD_FLMTIME | OBD_MD_FLATIME | OBD_MD_FLCTIME);
+ OBD_MD_FLMTIME | OBD_MD_FLATIME | OBD_MD_FLCTIME |
+ OBD_MD_FLUID | OBD_MD_FLGID | OBD_MD_FLPROJID);
info->fti_attr.la_size = start;
info->fti_attr.la_valid |= LA_SIZE;
GOTO(out_put, rc);
ofd_counter_incr(tsi->tsi_exp, LPROC_OFD_STATS_PUNCH,
- tsi->tsi_jobid, 1);
+ tsi->tsi_jobid, ktime_us_delta(ktime_get(), kstart));
EXIT;
out_put:
ofd_object_put(tsi->tsi_env, fo);
out:
if (srvlock)
- tgt_extent_unlock(&lh, LCK_PW);
+ tgt_data_unlock(&lh, LCK_PW);
if (rc == 0) {
/* we do not call this before to avoid lu_object_find() in
* ->lvbo_update() holding another reference on the object.
* otherwise concurrent destroy can make the object unavailable
* for 2nd lu_object_find() waiting for the first reference
* to go... deadlock! */
- res = ldlm_resource_get(ns, NULL, &tsi->tsi_resid,
- LDLM_EXTENT, 0);
+ res = ldlm_resource_get(ns, &tsi->tsi_resid, LDLM_EXTENT, 0);
if (!IS_ERR(res)) {
struct ost_lvb *res_lvb;
req->rq_status = ofd_ladvise_prefetch(env, fo,
tbc->local,
start, end, dbt);
- tgt_extent_unlock(&lockh, LCK_PR);
+ tgt_data_unlock(&lockh, LCK_PR);
break;
case LU_LADVISE_DONTNEED:
rc = dt_ladvise(env, dob, ladvise->lla_start,
{
struct obd_quotactl *oqctl, *repoqc;
struct lu_nodemap *nodemap;
+ ktime_t kstart = ktime_get();
+ char *buffer = NULL;
int id;
int rc;
-
ENTRY;
oqctl = req_capsule_client_get(tsi->tsi_pill, &RMF_OBD_QUOTACTL);
if (oqctl == NULL)
RETURN(err_serious(-EPROTO));
+ if (oqctl->qc_cmd == LUSTRE_Q_ITEROQUOTA)
+ req_capsule_set_size(tsi->tsi_pill, &RMF_OBD_QUOTA_ITER,
+ RCL_SERVER, LQUOTA_ITER_BUFLEN);
+ else
+ req_capsule_set_size(tsi->tsi_pill, &RMF_OBD_QUOTA_ITER,
+ RCL_SERVER, 0);
+
+ rc = req_capsule_server_pack(tsi->tsi_pill);
+ if (rc)
+ RETURN(err_serious(rc));
+
repoqc = req_capsule_server_get(tsi->tsi_pill, &RMF_OBD_QUOTACTL);
if (repoqc == NULL)
RETURN(err_serious(-ENOMEM));
- *repoqc = *oqctl;
+ if (oqctl->qc_cmd == LUSTRE_Q_ITEROQUOTA) {
+ buffer = req_capsule_server_get(tsi->tsi_pill,
+ &RMF_OBD_QUOTA_ITER);
+ if (buffer == NULL)
+ RETURN(err_serious(-ENOMEM));
+ }
nodemap = nodemap_get_from_exp(tsi->tsi_exp);
if (IS_ERR(nodemap))
RETURN(PTR_ERR(nodemap));
- id = repoqc->qc_id;
+ id = oqctl->qc_id;
if (oqctl->qc_type == USRQUOTA)
id = nodemap_map_id(nodemap, NODEMAP_UID,
- NODEMAP_CLIENT_TO_FS,
- repoqc->qc_id);
+ NODEMAP_CLIENT_TO_FS, id);
else if (oqctl->qc_type == GRPQUOTA)
id = nodemap_map_id(nodemap, NODEMAP_GID,
- NODEMAP_CLIENT_TO_FS,
- repoqc->qc_id);
+ NODEMAP_CLIENT_TO_FS, id);
+ else if (oqctl->qc_type == PRJQUOTA)
+ id = nodemap_map_id(nodemap, NODEMAP_PROJID,
+ NODEMAP_CLIENT_TO_FS, id);
+ if (oqctl->qc_cmd == LUSTRE_Q_ITEROQUOTA)
+ rc = lquota_iter_change_qid(nodemap, oqctl);
nodemap_putref(nodemap);
+ if (rc)
+ RETURN(rc);
- if (repoqc->qc_id != id)
- swap(repoqc->qc_id, id);
+ if (oqctl->qc_id != id)
+ swap(oqctl->qc_id, id);
- rc = lquotactl_slv(tsi->tsi_env, tsi->tsi_tgt->lut_bottom, repoqc);
+ rc = lquotactl_slv(tsi->tsi_env, tsi->tsi_tgt->lut_bottom, nodemap,
+ oqctl, buffer);
ofd_counter_incr(tsi->tsi_exp, LPROC_OFD_STATS_QUOTACTL,
- tsi->tsi_jobid, 1);
+ tsi->tsi_jobid, ktime_us_delta(ktime_get(), kstart));
+
+ if (oqctl->qc_id != id)
+ swap(oqctl->qc_id, id);
- if (repoqc->qc_id != id)
- swap(repoqc->qc_id, id);
+ QCTL_COPY_NO_PNAME(repoqc, oqctl);
RETURN(rc);
}
ENTRY;
- data->lpa_timeout = prolong_timeout(tgt_ses_req(tsi));
+ data->lpa_req = tgt_ses_req(tsi);
data->lpa_export = tsi->tsi_exp;
data->lpa_resid = tsi->tsi_resid;
/* bingo */
LASSERT(lock->l_export == data->lpa_export);
ldlm_lock_prolong_one(lock, data);
- LDLM_LOCK_PUT(lock);
+ ldlm_lock_put(lock);
if (data->lpa_locks_cnt > 0)
RETURN_EXIT;
/* The lock was destroyed probably lets try
* resource tree. */
} else {
lock->l_last_used = ktime_get();
- LDLM_LOCK_PUT(lock);
+ ldlm_lock_put(lock);
}
}
}
OST_PUNCH, ofd_punch_hdl,
ofd_hp_punch),
TGT_OST_HDL(HAS_BODY | HAS_REPLY, OST_SYNC, ofd_sync_hdl),
-TGT_OST_HDL(HAS_REPLY, OST_QUOTACTL, ofd_quotactl),
+TGT_OST_HDL(0, OST_QUOTACTL, ofd_quotactl),
TGT_OST_HDL(HAS_BODY | HAS_REPLY, OST_LADVISE, ofd_ladvise_hdl),
-TGT_OST_HDL(HAS_BODY | HAS_REPLY | IS_MUTABLE, OST_FALLOCATE, ofd_fallocate_hdl)
+TGT_OST_HDL(HAS_BODY | HAS_REPLY | IS_MUTABLE, OST_FALLOCATE, ofd_fallocate_hdl),
+TGT_OST_HDL(HAS_BODY | HAS_REPLY, OST_SEEK, tgt_lseek),
};
static struct tgt_opc_slice ofd_common_slice[] = {
struct ofd_thread_info *info = NULL;
struct obd_device *obd;
struct tg_grants_data *tgd = &m->ofd_lut.lut_tgd;
+ DECLARE_BITMAP(lmd_flags, LMD_FLG_NUM_FLAGS);
struct lu_fid fid;
struct nm_config_file *nodemap_config;
struct obd_device_target *obt;
- u32 lmd_flags = 0;
int rc;
ENTRY;
if (rc != 0)
RETURN(rc);
- obt = &obd->u.obt;
- obt->obt_magic = OBT_MAGIC;
+ obt = obd_obt_init(obd);
spin_lock_init(&m->ofd_flags_lock);
m->ofd_raid_degraded = 0;
- m->ofd_checksum_t10pi_enforce = 0;
m->ofd_sync_journal = 0;
ofd_slc_set(m);
m->ofd_soft_sync_limit = OFD_SOFT_SYNC_LIMIT_DEFAULT;
m->ofd_seq_count = 0;
- init_waitqueue_head(&m->ofd_inconsistency_thread.t_ctl_waitq);
INIT_LIST_HEAD(&m->ofd_inconsistency_list);
spin_lock_init(&m->ofd_inconsistency_lock);
if (info == NULL)
RETURN(-EFAULT);
- rc = ofd_stack_init(env, m, cfg, &lmd_flags);
+ rc = ofd_stack_init(env, m, cfg, lmd_flags);
if (rc) {
CERROR("%s: can't init device stack, rc %d\n",
obd->obd_name, rc);
RETURN(rc);
}
-#if LUSTRE_VERSION_CODE < OBD_OCD_VERSION(2, 14, 53, 0)
- ofd_procfs_add_brw_stats_symlink(m);
-#endif
-
snprintf(info->fti_u.name, sizeof(info->fti_u.name), "%s-%s",
"filter"/*LUSTRE_OST_NAME*/, obd->obd_uuid.uuid);
m->ofd_namespace = ldlm_namespace_new(obd, info->fti_u.name,
LDLM_NAMESPACE_SERVER,
LDLM_NAMESPACE_GREEDY,
LDLM_NS_TYPE_OST);
- if (m->ofd_namespace == NULL)
- GOTO(err_fini_stack, rc = -ENOMEM);
+ if (IS_ERR(m->ofd_namespace)) {
+ rc = PTR_ERR(m->ofd_namespace);
+ CERROR("%s: unable to create server namespace: rc = %d\n",
+ obd->obd_name, rc);
+ m->ofd_namespace = NULL;
+ GOTO(err_fini_stack, rc);
+ }
/* set obd_namespace for compatibility with old code */
obd->obd_namespace = m->ofd_namespace;
ldlm_register_intent(m->ofd_namespace, ofd_intent_policy);
if (rc)
GOTO(err_free_ns, rc);
- if (lmd_flags & LMD_FLG_SKIP_LFSCK)
+ if (test_bit(LMD_FLG_SKIP_LFSCK, lmd_flags))
m->ofd_skip_lfsck = 1;
- if (lmd_flags & LMD_FLG_LOCAL_RECOV)
+ if (test_bit(LMD_FLG_LOCAL_RECOV, lmd_flags))
m->ofd_lut.lut_local_recovery = 1;
rc = ofd_tunables_init(m);
tgd->tgd_reserved_pcnt = 0;
m->ofd_brw_size = m->ofd_lut.lut_dt_conf.ddp_brw_size;
- m->ofd_cksum_types_supported =
- obd_cksum_types_supported_server(obd->obd_name);
m->ofd_precreate_batch = OFD_PRECREATE_BATCH_DEFAULT;
if (tgd->tgd_osfs.os_bsize * tgd->tgd_osfs.os_blocks <
OFD_PRECREATE_SMALL_FS)
ofd_stop_inconsistency_verification_thread(m);
lfsck_degister(env, m->ofd_osd);
ofd_fs_cleanup(env, m);
- nm_config_file_deregister_tgt(env, obd->u.obt.obt_nodemap_config_file);
- obd->u.obt.obt_nodemap_config_file = NULL;
+ nm_config_file_deregister_tgt(env,
+ obd2obt(obd)->obt_nodemap_config_file);
+ obd2obt(obd)->obt_nodemap_config_file = NULL;
if (m->ofd_namespace != NULL) {
ldlm_namespace_free_post(m->ofd_namespace);
{
ENTRY;
ofd_fini(env, ofd_dev(d));
+ target_cleanup_recovery(d->ld_obd);
RETURN(NULL);
}
/* type constructor/destructor: ofd_type_init(), ofd_type_fini() */
LU_TYPE_INIT_FINI(ofd, &ofd_thread_key);
-static struct lu_device_type_operations ofd_device_type_ops = {
+static const struct lu_device_type_operations ofd_device_type_ops = {
.ldto_init = ofd_type_init,
.ldto_fini = ofd_type_fini,
{
int rc;
+ rc = libcfs_setup();
+ if (rc)
+ return rc;
+
rc = lu_kmem_init(ofd_caches);
if (rc)
return rc;
- rc = ofd_access_log_module_init();
+ rc = oss_mod_init();
if (rc)
goto out_caches;
- rc = class_register_type(&ofd_obd_ops, NULL, true, NULL,
+ rc = ofd_access_log_module_init();
+ if (rc)
+ goto out_oss_fini;
+
+ rc = class_register_type(&ofd_obd_ops, NULL, true,
LUSTRE_OST_NAME, &ofd_device_type);
if (rc)
goto out_ofd_access_log;
out_ofd_access_log:
ofd_access_log_module_exit();
+out_oss_fini:
+ oss_mod_exit();
out_caches:
lu_kmem_fini(ofd_caches);
{
class_unregister_type(LUSTRE_OST_NAME);
ofd_access_log_module_exit();
+ oss_mod_exit();
lu_kmem_fini(ofd_caches);
}