* Copyright (c) 2009, 2010, Oracle and/or its affiliates. All rights reserved.
* Use is subject to license terms.
*
- * Copyright (c) 2012, 2015, Intel Corporation.
+ * Copyright (c) 2012, 2017, Intel Corporation.
*/
/*
* This file is part of Lustre, http://www.lustre.org/
#define DEBUG_SUBSYSTEM S_OSD
-#include <lustre_ver.h>
#include <libcfs/libcfs.h>
#include <obd_support.h>
#include <lustre_net.h>
#include <obd_class.h>
#include <lustre_disk.h>
#include <lustre_fid.h>
-#include <lustre_param.h>
+#include <uapi/linux/lustre/lustre_param.h>
#include <md_object.h>
#include "osd_internal.h"
struct lu_context_key osd_key;
+static int osd_txg_sync_delay_us = -1;
+
/* Slab for OSD object allocation */
struct kmem_cache *osd_object_kmem;
osd_dt_dev(th->th_dev)->od_svname, th, error);
}
- dt_txn_hook_commit(th);
-
/* call per-transaction callbacks if any */
- list_for_each_entry_safe(dcb, tmp, &oh->ot_dcb_list, dcb_linkage)
+ list_for_each_entry_safe(dcb, tmp, &oh->ot_dcb_list, dcb_linkage) {
+ LASSERTF(dcb->dcb_magic == TRANS_COMMIT_CB_MAGIC,
+ "commit callback entry: magic=%x name='%s'\n",
+ dcb->dcb_magic, dcb->dcb_name);
+ list_del_init(&dcb->dcb_linkage);
dcb->dcb_func(NULL, th, dcb, error);
+ }
/* Unlike ldiskfs, zfs updates space accounting at commit time.
* As a consequence, op_end is called only now to inform the quota slave
* should be released. Quota space won't be adjusted at this point since
* we can't provide a suitable environment. It will be performed
* asynchronously by a lquota thread. */
- qsd_op_end(NULL, osd->od_quota_slave, &oh->ot_quota_trans);
+ qsd_op_end(NULL, osd->od_quota_slave_dt, &oh->ot_quota_trans);
+ if (osd->od_quota_slave_md != NULL)
+ qsd_op_end(NULL, osd->od_quota_slave_md, &oh->ot_quota_trans);
lu_device_put(lud);
th->th_dev = NULL;
- lu_context_exit(&th->th_ctx);
- lu_context_fini(&th->th_ctx);
OBD_FREE_PTR(oh);
EXIT;
static int osd_trans_cb_add(struct thandle *th, struct dt_txn_commit_cb *dcb)
{
- struct osd_thandle *oh = container_of0(th, struct osd_thandle,
- ot_super);
+ struct osd_thandle *oh = container_of(th, struct osd_thandle,
+ ot_super);
LASSERT(dcb->dcb_magic == TRANS_COMMIT_CB_MAGIC);
LASSERT(&dcb->dcb_func != NULL);
static int osd_trans_start(const struct lu_env *env, struct dt_device *d,
struct thandle *th)
{
- struct osd_thandle *oh;
- int rc;
+ struct osd_device *osd = osd_dt_dev(d);
+ struct osd_thandle *oh;
+ int rc;
+
ENTRY;
- oh = container_of0(th, struct osd_thandle, ot_super);
+ oh = container_of(th, struct osd_thandle, ot_super);
LASSERT(oh);
LASSERT(oh->ot_tx);
rc = dt_txn_hook_start(env, d, th);
- if (rc != 0)
+ if (rc != 0) {
+ CERROR("%s: dt_txn_hook_start failed: rc = %d\n",
+ osd->od_svname, rc);
RETURN(rc);
+ }
- if (oh->ot_write_commit && OBD_FAIL_CHECK(OBD_FAIL_OST_MAPBLK_ENOSPC))
+ if (OBD_FAIL_CHECK(OBD_FAIL_OSD_TXN_START))
/* Unlike ldiskfs, ZFS checks for available space and returns
* -ENOSPC when assigning txg */
- RETURN(-ENOSPC);
+ RETURN(-EIO);
rc = -dmu_tx_assign(oh->ot_tx, TXG_WAIT);
if (unlikely(rc != 0)) {
- struct osd_device *osd = osd_dt_dev(d);
/* dmu will call commit callback with error code during abort */
if (!lu_device_is_md(&d->dd_lu_dev) && rc == -ENOSPC)
CERROR("%s: failed to start transaction due to ENOSPC"
/* add commit callback */
dmu_tx_callback_register(oh->ot_tx, osd_trans_commit_cb, oh);
oh->ot_assigned = 1;
- lu_context_init(&th->th_ctx, th->th_tags);
- lu_context_enter(&th->th_ctx);
+ osd_oti_get(env)->oti_in_trans = 1;
lu_device_get(&d->dd_lu_dev);
}
RETURN(rc);
}
-static int osd_unlinked_object_free(struct osd_device *osd, uint64_t oid);
-
-static void osd_unlinked_list_emptify(struct osd_device *osd,
+static void osd_unlinked_list_emptify(const struct lu_env *env,
+ struct osd_device *osd,
struct list_head *list, bool free)
{
struct osd_object *obj;
while (!list_empty(list)) {
obj = list_entry(list->next,
struct osd_object, oo_unlinked_linkage);
- LASSERT(obj->oo_db != NULL);
- oid = obj->oo_db->db_object;
+ LASSERT(obj->oo_dn != NULL);
+ oid = obj->oo_dn->dn_object;
list_del_init(&obj->oo_unlinked_linkage);
if (free)
- (void)osd_unlinked_object_free(osd, oid);
+ (void)osd_unlinked_object_free(env, osd, oid);
}
}
struct osd_device *osd = osd_dt_dev(th->th_dev);
bool sync = (th->th_sync != 0);
struct osd_thandle *oh;
- struct list_head unlinked;
+ LIST_HEAD(unlinked);
uint64_t txg;
int rc;
ENTRY;
- oh = container_of0(th, struct osd_thandle, ot_super);
- INIT_LIST_HEAD(&unlinked);
+ oh = container_of(th, struct osd_thandle, ot_super);
list_splice_init(&oh->ot_unlinked_list, &unlinked);
+ osd_oti_get(env)->oti_ins_cache_depth--;
+ /* reset OI cache for safety */
+ if (osd_oti_get(env)->oti_ins_cache_depth == 0)
+ osd_oti_get(env)->oti_ins_cache_used = 0;
+
if (oh->ot_assigned == 0) {
LASSERT(oh->ot_tx);
+ CDEBUG(D_OTHER, "%s: transaction is aborted\n", osd->od_svname);
+ osd_trans_stop_cb(oh, th->th_result);
dmu_tx_abort(oh->ot_tx);
- osd_object_sa_dirty_rele(oh);
- osd_unlinked_list_emptify(osd, &unlinked, false);
+ osd_object_sa_dirty_rele(env, oh);
+ osd_unlinked_list_emptify(env, osd, &unlinked, false);
/* there won't be any commit, release reserved quota space now,
* if any */
- qsd_op_end(env, osd->od_quota_slave, &oh->ot_quota_trans);
+ qsd_op_end(env, osd->od_quota_slave_dt, &oh->ot_quota_trans);
+ if (osd->od_quota_slave_md != NULL)
+ qsd_op_end(env, osd->od_quota_slave_md,
+ &oh->ot_quota_trans);
OBD_FREE_PTR(oh);
RETURN(0);
}
- /* When doing our own inode accounting, the ZAPs storing per-uid/gid
- * usage are updated at operation execution time, so we should call
- * qsd_op_end() straight away. Otherwise (for blk accounting maintained
- * by ZFS and when #inode is estimated from #blks) accounting is updated
- * at commit time and the call to qsd_op_end() must be delayed */
- if (oh->ot_quota_trans.lqt_id_cnt > 0 &&
- !oh->ot_quota_trans.lqt_ids[0].lqi_is_blk &&
- !osd->od_quota_iused_est)
- qsd_op_end(env, osd->od_quota_slave, &oh->ot_quota_trans);
-
rc = dt_txn_hook_stop(env, th);
if (rc != 0)
CDEBUG(D_OTHER, "%s: transaction hook failed: rc = %d\n",
LASSERT(oh->ot_tx);
txg = oh->ot_tx->tx_txg;
- osd_object_sa_dirty_rele(oh);
+ osd_object_sa_dirty_rele(env, oh);
/* XXX: Once dmu_tx_commit() called, oh/th could have been freed
* by osd_trans_commit_cb already. */
dmu_tx_commit(oh->ot_tx);
+ osd_oti_get(env)->oti_in_trans = 0;
- osd_unlinked_list_emptify(osd, &unlinked, true);
+ osd_unlinked_list_emptify(env, osd, &unlinked, true);
- if (sync)
- txg_wait_synced(dmu_objset_pool(osd->od_os), txg);
+ if (sync) {
+ if (osd_txg_sync_delay_us < 0)
+ txg_wait_synced(dmu_objset_pool(osd->od_os), txg);
+ else
+ udelay(osd_txg_sync_delay_us);
+ }
RETURN(rc);
}
dmu_tx_t *tx;
ENTRY;
+ if (dt->dd_rdonly) {
+ CERROR("%s: someone try to start transaction under "
+ "readonly mode, should be disabled.\n",
+ osd_name(osd_dt_dev(dt)));
+ dump_stack();
+ RETURN(ERR_PTR(-EROFS));
+ }
+
tx = dmu_tx_create(osd->od_os);
if (tx == NULL)
RETURN(ERR_PTR(-ENOMEM));
INIT_LIST_HEAD(&oh->ot_stop_dcb_list);
INIT_LIST_HEAD(&oh->ot_unlinked_list);
INIT_LIST_HEAD(&oh->ot_sa_list);
- sema_init(&oh->ot_sa_lock, 1);
memset(&oh->ot_quota_trans, 0, sizeof(oh->ot_quota_trans));
th = &oh->ot_super;
th->th_dev = dt;
th->th_result = 0;
- th->th_tags = LCT_TX_HANDLE;
+
+ osd_oti_get(env)->oti_ins_cache_depth++;
+
RETURN(th);
}
* gradually disappears as the number of real dnodes grows. It also
* avoids the need to check for divide-by-zero computing dn_per_block.
*/
- CLASSERT(OSD_DNODE_MIN_BLKSHIFT > 0);
- CLASSERT(OSD_DNODE_EST_BLKSHIFT > 0);
+ BUILD_BUG_ON(OSD_DNODE_MIN_BLKSHIFT <= 0);
+ BUILD_BUG_ON(OSD_DNODE_EST_BLKSHIFT <= 0);
- est_usedblocks = (usedbytes >> est_maxblockshift) +
- (OSD_DNODE_EST_COUNT >> OSD_DNODE_EST_BLKSHIFT);
- est_usedobjs = usedobjs + OSD_DNODE_EST_COUNT;
+ est_usedblocks = ((OSD_DNODE_EST_COUNT << OSD_DNODE_EST_BLKSHIFT) +
+ usedbytes) >> est_maxblockshift;
+ est_usedobjs = OSD_DNODE_EST_COUNT + usedobjs;
if (est_usedobjs <= est_usedblocks) {
/*
osfs->os_bavail = osfs->os_bfree; /* no extra root reservation */
/* Take replication (i.e. number of copies) into account */
- osfs->os_bavail /= os->os_copies;
+ if (os->os_copies != 0)
+ osfs->os_bavail /= os->os_copies;
/*
* Reserve some space so we don't run into ENOSPC due to grants not
* Reserve 0.78% of total space, at least 16MB for small filesystems,
* for internal files to be created/unlinked when space is tight.
*/
- CLASSERT(OSD_STATFS_RESERVED_SIZE > 0);
+ BUILD_BUG_ON(OSD_STATFS_RESERVED_SIZE <= 0);
reserved = OSD_STATFS_RESERVED_SIZE >> bshift;
if (likely(osfs->os_blocks >= reserved << OSD_STATFS_RESERVED_SHIFT))
reserved = osfs->os_blocks >> OSD_STATFS_RESERVED_SHIFT;
if (!spa_writeable(dmu_objset_spa(os)) ||
osd->od_dev_set_rdonly || osd->od_prop_rdonly)
- osfs->os_state |= OS_STATE_READONLY;
+ osfs->os_state |= OS_STATFS_READONLY;
return 0;
}
* Concurrency: shouldn't matter.
*/
int osd_statfs(const struct lu_env *env, struct dt_device *d,
- struct obd_statfs *osfs)
+ struct obd_statfs *osfs, struct obd_statfs_info *info)
{
- int rc;
+ struct osd_device *osd = osd_dt_dev(d);
+ int rc;
ENTRY;
- rc = osd_objset_statfs(osd_dt_dev(d), osfs);
+ rc = osd_objset_statfs(osd, osfs);
if (unlikely(rc != 0))
RETURN(rc);
osfs->os_bavail -= min_t(u64,
OSD_GRANT_FOR_LOCAL_OIDS / osfs->os_bsize,
osfs->os_bavail);
+
+ /* ZFS does not support reporting nonrotional status yet, so return
+ * flag only if user has set nonrotational.
+ */
+ osfs->os_state |= osd->od_nonrotational ? OS_STATFS_NONROT : 0;
+
RETURN(0);
}
/* nr_blkptrshift is the log2 of the number of block pointers that can
* be stored in an indirect block */
- CLASSERT(DN_MAX_INDBLKSHIFT > SPA_BLKPTRSHIFT);
+ BUILD_BUG_ON(DN_MAX_INDBLKSHIFT <= SPA_BLKPTRSHIFT);
nr_blkptrshift = DN_MAX_INDBLKSHIFT - SPA_BLKPTRSHIFT;
/* max_blockshift / nr_blkptrshift is thus the maximum depth of the
param->ddp_mntopts = MNTOPT_USERXATTR;
if (osd->od_posix_acl)
param->ddp_mntopts |= MNTOPT_ACL;
- param->ddp_max_ea_size = DXATTR_MAX_ENTRY_SIZE;
+ /* Previously DXATTR_MAX_ENTRY_SIZE */
+ param->ddp_max_ea_size = OBD_MAX_EA_SIZE;
/* for maxbytes, report same value as ZPL */
param->ddp_maxbytes = MAX_LFS_FILESIZE;
param->ddp_max_extent_blks =
(1 << (DN_MAX_INDBLKSHIFT - SPA_BLKPTRSHIFT));
param->ddp_extent_tax = osd_blk_insert_cost(osd);
+
+ /* Preferred RPC size for efficient disk IO. 1MB shows good
+ * all-around performance for ZFS, but use blocksize (recordsize)
+ * by default if larger to avoid read-modify-write. */
+ if (osd->od_max_blksz > ONE_MB_BRW_SIZE)
+ param->ddp_brw_size = osd->od_max_blksz;
+ else
+ param->ddp_brw_size = ONE_MB_BRW_SIZE;
+
+#ifdef HAVE_DMU_OFFSET_NEXT
+ param->ddp_has_lseek_data_hole = true;
+#else
+ param->ddp_has_lseek_data_hole = false;
+#endif
}
/*
*/
static int osd_sync(const struct lu_env *env, struct dt_device *d)
{
- struct osd_device *osd = osd_dt_dev(d);
- CDEBUG(D_CACHE, "syncing OSD %s\n", LUSTRE_OSD_ZFS_NAME);
- txg_wait_synced(dmu_objset_pool(osd->od_os), 0ULL);
- CDEBUG(D_CACHE, "synced OSD %s\n", LUSTRE_OSD_ZFS_NAME);
+ if (!d->dd_rdonly) {
+ struct osd_device *osd = osd_dt_dev(d);
+
+ CDEBUG(D_CACHE, "syncing OSD %s\n", LUSTRE_OSD_ZFS_NAME);
+ txg_wait_synced(dmu_objset_pool(osd->od_os), 0ULL);
+ CDEBUG(D_CACHE, "synced OSD %s\n", LUSTRE_OSD_ZFS_NAME);
+ }
+
return 0;
}
RETURN(0);
}
-static struct dt_device_operations osd_dt_ops = {
+static const struct dt_device_operations osd_dt_ops = {
.dt_root_get = osd_root_get,
.dt_statfs = osd_statfs,
.dt_trans_create = osd_trans_create,
struct lu_context_key *key, void *data)
{
struct osd_thread_info *info = data;
+ struct osd_idmap_cache *idc = info->oti_ins_cache;
+ if (idc != NULL) {
+ LASSERT(info->oti_ins_cache_size > 0);
+ OBD_FREE_PTR_ARRAY_LARGE(idc, info->oti_ins_cache_size);
+ info->oti_ins_cache = NULL;
+ info->oti_ins_cache_size = 0;
+ }
+ lu_buf_free(&info->oti_xattr_lbuf);
OBD_FREE_PTR(info);
}
static void osd_key_exit(const struct lu_context *ctx,
struct lu_context_key *key, void *data)
{
- struct osd_thread_info *info = data;
-
- memset(info, 0, sizeof(*info));
}
struct lu_context_key osd_key = {
ENTRY;
/* shutdown quota slave instance associated with the device */
- if (o->od_quota_slave != NULL) {
- qsd_fini(env, o->od_quota_slave);
- o->od_quota_slave = NULL;
+ if (o->od_quota_slave_md != NULL) {
+ /* complete all in-flight callbacks */
+ osd_sync(env, &o->od_dt_dev);
+ txg_wait_callbacks(spa_get_dsl(dmu_objset_spa(o->od_os)));
+ qsd_fini(env, o->od_quota_slave_md);
+ o->od_quota_slave_md = NULL;
}
+ if (o->od_quota_slave_dt != NULL) {
+ /* complete all in-flight callbacks */
+ osd_sync(env, &o->od_dt_dev);
+ txg_wait_callbacks(spa_get_dsl(dmu_objset_spa(o->od_os)));
+ qsd_fini(env, o->od_quota_slave_dt);
+ o->od_quota_slave_dt = NULL;
+ }
osd_fid_fini(env, o);
RETURN(0);
osd->od_prop_rdonly = !!newval;
}
+#ifdef HAVE_DMU_OBJECT_ALLOC_DNSIZE
+static void osd_dnodesize_changed_cb(void *arg, uint64_t newval)
+{
+ struct osd_device *osd = arg;
+
+ osd->od_dnsize = newval;
+}
+#endif
/*
* This function unregisters all registered callbacks. It's harmless to
* unregister callbacks that were never registered so it is used to safely
osd_recordsize_changed_cb, o);
(void) dsl_prop_unregister(ds, zfs_prop_to_name(ZFS_PROP_READONLY),
osd_readonly_changed_cb, o);
+#ifdef HAVE_DMU_OBJECT_ALLOC_DNSIZE
+ (void) dsl_prop_unregister(ds, zfs_prop_to_name(ZFS_PROP_DNODESIZE),
+ osd_dnodesize_changed_cb, o);
+#endif
if (o->arc_prune_cb != NULL) {
arc_remove_prune_callback(o->arc_prune_cb);
if (rc)
GOTO(err, rc);
+#ifdef HAVE_DMU_OBJECT_ALLOC_DNSIZE
+ rc = -dsl_prop_register(ds, zfs_prop_to_name(ZFS_PROP_DNODESIZE),
+ osd_dnodesize_changed_cb, o);
+ if (rc)
+ GOTO(err, rc);
+#endif
+
o->arc_prune_cb = arc_add_prune_callback(arc_prune_func, o);
err:
dsl_pool_config_exit(dp, FTAG);
static int osd_objset_open(struct osd_device *o)
{
uint64_t version = ZPL_VERSION;
- uint64_t sa_obj;
+ uint64_t sa_obj, unlink_obj;
int rc;
ENTRY;
- rc = -dmu_objset_own(o->od_mntdev, DMU_OST_ZFS, B_FALSE, o, &o->od_os);
+ rc = -osd_dmu_objset_own(o->od_mntdev, DMU_OST_ZFS,
+ o->od_dt_dev.dd_rdonly ? B_TRUE : B_FALSE,
+ B_TRUE, o, &o->od_os);
+
if (rc) {
CERROR("%s: can't open %s\n", o->od_svname, o->od_mntdev);
o->od_os = NULL;
- goto out;
+
+ GOTO(out, rc);
}
/* Check ZFS version */
}
rc = -zap_lookup(o->od_os, MASTER_NODE_OBJ, ZFS_UNLINKED_SET,
- 8, 1, &o->od_unlinkedid);
+ 8, 1, &unlink_obj);
if (rc) {
CERROR("%s: lookup for %s failed: rc = %d\n",
o->od_svname, ZFS_UNLINKED_SET, rc);
GOTO(out, rc = -ENOTSUPP);
}
+ rc = __osd_obj2dnode(o->od_os, unlink_obj, &o->od_unlinked);
+ if (rc) {
+ CERROR("%s: can't get dnode for unlinked: rc = %d\n",
+ o->od_svname, rc);
+ GOTO(out, rc);
+ }
+
out:
if (rc != 0 && o->od_os != NULL) {
- dmu_objset_disown(o->od_os, o);
+ osd_dmu_objset_disown(o->od_os, B_TRUE, o);
o->od_os = NULL;
}
RETURN(rc);
}
-static int
-osd_unlinked_object_free(struct osd_device *osd, uint64_t oid)
+int osd_unlinked_object_free(const struct lu_env *env, struct osd_device *osd,
+ uint64_t oid)
{
+ char *key = osd_oti_get(env)->oti_str;
int rc;
dmu_tx_t *tx;
+ if (osd->od_dt_dev.dd_rdonly) {
+ CERROR("%s: someone try to free objects under "
+ "readonly mode, should be disabled.\n", osd_name(osd));
+ dump_stack();
+
+ return -EROFS;
+ }
+
rc = -dmu_free_long_range(osd->od_os, oid, 0, DMU_OBJECT_END);
if (rc != 0) {
CWARN("%s: Cannot truncate %llu: rc = %d\n",
}
tx = dmu_tx_create(osd->od_os);
+ dmu_tx_mark_netfree(tx);
dmu_tx_hold_free(tx, oid, 0, DMU_OBJECT_END);
- dmu_tx_hold_zap(tx, osd->od_unlinkedid, FALSE, NULL);
+ osd_tx_hold_zap(tx, osd->od_unlinked->dn_object, osd->od_unlinked,
+ FALSE, NULL);
rc = -dmu_tx_assign(tx, TXG_WAIT);
if (rc != 0) {
CWARN("%s: Cannot assign tx for %llu: rc = %d\n",
goto failed;
}
- rc = -zap_remove_int(osd->od_os, osd->od_unlinkedid, oid, tx);
+ snprintf(key, sizeof(osd_oti_get(env)->oti_str), "%llx", oid);
+ rc = osd_zap_remove(osd, osd->od_unlinked->dn_object,
+ osd->od_unlinked, key, tx);
if (rc != 0) {
CWARN("%s: Cannot remove %llu from unlinked set: rc = %d\n",
osd->od_svname, oid, rc);
zap_cursor_t zc;
zap_attribute_t *za = &osd_oti_get(env)->oti_za;
- zap_cursor_init(&zc, osd->od_os, osd->od_unlinkedid);
+ zap_cursor_init(&zc, osd->od_os, osd->od_unlinked->dn_object);
while (zap_cursor_retrieve(&zc, za) == 0) {
/* If cannot free the object, leave it in the unlinked set,
* until the OSD is mounted again when obd_unlinked_drain()
* will be called. */
- if (osd_unlinked_object_free(osd, za->za_first_integer) != 0)
+ if (osd_unlinked_object_free(env, osd, za->za_first_integer))
break;
zap_cursor_advance(&zc);
}
static int osd_mount(const struct lu_env *env,
struct osd_device *o, struct lustre_cfg *cfg)
{
- char *mntdev = lustre_cfg_string(cfg, 1);
- char *svname = lustre_cfg_string(cfg, 4);
- dmu_buf_t *rootdb;
- const char *opts;
- int rc;
+ char *mntdev = lustre_cfg_string(cfg, 1);
+ char *str = lustre_cfg_string(cfg, 2);
+ char *svname = lustre_cfg_string(cfg, 4);
+ dnode_t *rootdn;
+ const char *opts;
+ bool resetoi = false;
+ int rc;
+
ENTRY;
if (o->od_os != NULL)
if (rc >= sizeof(o->od_svname))
RETURN(-E2BIG);
+ opts = lustre_cfg_string(cfg, 3);
+
+ o->od_index_backup_stop = 0;
+ o->od_index = -1; /* -1 means index is invalid */
+ rc = server_name2index(o->od_svname, &o->od_index, NULL);
+ str = strstr(str, ":");
+ if (str) {
+ unsigned long flags;
+
+ rc = kstrtoul(str + 1, 10, &flags);
+ if (rc)
+ RETURN(-EINVAL);
+
+ if (flags & LMD_FLG_DEV_RDONLY) {
+ o->od_dt_dev.dd_rdonly = 1;
+ LCONSOLE_WARN("%s: set dev_rdonly on this device\n",
+ svname);
+ }
+
+ if (flags & LMD_FLG_NOSCRUB)
+ o->od_auto_scrub_interval = AS_NEVER;
+ }
+
if (server_name_is_ost(o->od_svname))
o->od_is_ost = 1;
o->od_xattr_in_sa = B_TRUE;
o->od_max_blksz = osd_spa_maxblocksize(o->od_os->os_spa);
+ o->od_readcache_max_filesize = OSD_MAX_CACHE_SIZE;
- rc = osd_objset_register_callbacks(o);
+ rc = __osd_obj2dnode(o->od_os, o->od_rootid, &rootdn);
if (rc)
GOTO(err, rc);
+ o->od_root = rootdn->dn_object;
+ osd_dnode_rele(rootdn);
- rc = __osd_obj2dbuf(env, o->od_os, o->od_rootid, &rootdb);
+ rc = __osd_obj2dnode(o->od_os, DMU_USERUSED_OBJECT,
+ &o->od_userused_dn);
if (rc)
GOTO(err, rc);
- o->od_root = rootdb->db_object;
- sa_buf_rele(rootdb, osd_obj_tag);
-
- /* 1. initialize oi before any file create or file open */
- rc = osd_oi_init(env, o);
+ rc = __osd_obj2dnode(o->od_os, DMU_GROUPUSED_OBJECT,
+ &o->od_groupused_dn);
if (rc)
GOTO(err, rc);
+#ifdef ZFS_PROJINHERIT
+ if (dmu_objset_projectquota_enabled(o->od_os)) {
+ rc = __osd_obj2dnode(o->od_os, DMU_PROJECTUSED_OBJECT,
+ &o->od_projectused_dn);
+ if (rc && rc != -ENOENT)
+ GOTO(err, rc);
+ }
+#endif
+
rc = lu_site_init(&o->od_site, osd2lu_dev(o));
if (rc)
GOTO(err, rc);
if (rc)
GOTO(err, rc);
- /* Use our own ZAP for inode accounting by default, this can be changed
- * via procfs to estimate the inode usage from the block usage */
- o->od_quota_iused_est = 0;
+ rc = osd_objset_register_callbacks(o);
+ if (rc)
+ GOTO(err, rc);
+
+ if (opts && strstr(opts, "resetoi"))
+ resetoi = true;
+
+ o->od_in_init = 1;
+ rc = osd_scrub_setup(env, o, resetoi);
+ o->od_in_init = 0;
+ if (rc)
+ GOTO(err, rc);
rc = osd_procfs_init(o, o->od_svname);
if (rc)
GOTO(err, rc);
- /* initialize quota slave instance */
- o->od_quota_slave = qsd_init(env, o->od_svname, &o->od_dt_dev,
- o->od_proc_entry);
- if (IS_ERR(o->od_quota_slave)) {
- rc = PTR_ERR(o->od_quota_slave);
- o->od_quota_slave = NULL;
+ /* currently it's no need to prepare qsd_instance_md for OST */
+ if (!o->od_is_ost) {
+ o->od_quota_slave_md = qsd_init(env, o->od_svname,
+ &o->od_dt_dev,
+ o->od_proc_entry, true);
+ if (IS_ERR(o->od_quota_slave_md)) {
+ rc = PTR_ERR(o->od_quota_slave_md);
+ o->od_quota_slave_md = NULL;
+ GOTO(err, rc);
+ }
+ }
+
+ o->od_quota_slave_dt = qsd_init(env, o->od_svname, &o->od_dt_dev,
+ o->od_proc_entry, false);
+
+ if (IS_ERR(o->od_quota_slave_dt)) {
+ if (o->od_quota_slave_md != NULL) {
+ qsd_fini(env, o->od_quota_slave_md);
+ o->od_quota_slave_md = NULL;
+ }
+
+ rc = PTR_ERR(o->od_quota_slave_dt);
+ o->od_quota_slave_dt = NULL;
GOTO(err, rc);
}
+#ifdef HAVE_DMU_USEROBJ_ACCOUNTING
+ if (!osd_dmu_userobj_accounting_available(o))
+ CWARN("%s: dnode accounting not enabled: "
+ "enable feature@userobj_accounting in pool\n",
+ o->od_mntdev);
+#endif
+
/* parse mount option "noacl", and enable ACL by default */
- opts = lustre_cfg_string(cfg, 3);
if (opts == NULL || strstr(opts, "noacl") == NULL)
o->od_posix_acl = 1;
osd_unlinked_drain(env, o);
err:
- if (rc) {
- dmu_objset_disown(o->od_os, o);
+ if (rc && o->od_os) {
+ osd_dmu_objset_disown(o->od_os, B_TRUE, o);
o->od_os = NULL;
}
CERROR("%s: lost %d pinned dbuf(s)\n", o->od_svname,
atomic_read(&o->od_zerocopy_pin));
+ if (o->od_unlinked) {
+ osd_dnode_rele(o->od_unlinked);
+ o->od_unlinked = NULL;
+ }
+ if (o->od_userused_dn) {
+ osd_dnode_rele(o->od_userused_dn);
+ o->od_userused_dn = NULL;
+ }
+ if (o->od_groupused_dn) {
+ osd_dnode_rele(o->od_groupused_dn);
+ o->od_groupused_dn = NULL;
+ }
+
+#ifdef ZFS_PROJINHERIT
+ if (o->od_projectused_dn) {
+ osd_dnode_rele(o->od_projectused_dn);
+ o->od_projectused_dn = NULL;
+ }
+#endif
+
if (o->od_os != NULL) {
- /* force a txg sync to get all commit callbacks */
- txg_wait_synced(dmu_objset_pool(o->od_os), 0ULL);
+ if (!o->od_dt_dev.dd_rdonly)
+ /* force a txg sync to get all commit callbacks */
+ txg_wait_synced(dmu_objset_pool(o->od_os), 0ULL);
/* close the object set */
- dmu_objset_disown(o->od_os, o);
-
+ osd_dmu_objset_disown(o->od_os, B_TRUE, o);
o->od_os = NULL;
}
l->ld_ops = &osd_lu_ops;
o->od_dt_dev.dd_ops = &osd_dt_ops;
+ sema_init(&o->od_otable_sem, 1);
+ INIT_LIST_HEAD(&o->od_ios_list);
+ o->od_auto_scrub_interval = AS_DEFAULT;
+
+ /* ZFS does not support reporting nonrotional status yet, so this flag
+ * is only set if explicitly set by the user.
+ */
+ o->od_nonrotational = 0;
out:
RETURN(rc);
INIT_LIST_HEAD(&osl->osl_seq_list);
rwlock_init(&osl->osl_seq_list_lock);
sema_init(&osl->osl_seq_init_sem, 1);
+ INIT_LIST_HEAD(&dev->od_index_backup_list);
+ INIT_LIST_HEAD(&dev->od_index_restore_list);
+ spin_lock_init(&dev->od_lock);
+ dev->od_index_backup_policy = LIBP_NONE;
rc = dt_device_init(&dev->od_dt_dev, type);
if (rc == 0) {
/* XXX: make osd top device in order to release reference */
d->ld_site->ls_top_dev = d;
lu_site_purge(env, d->ld_site, -1);
- if (!cfs_hash_is_empty(d->ld_site->ls_obj_hash)) {
- LIBCFS_DEBUG_MSG_DATA_DECL(msgdata, D_ERROR, NULL);
- lu_site_print(env, d->ld_site, &msgdata, lu_cdebug_printer);
- }
+ lu_site_print(env, d->ld_site, &d->ld_site->ls_obj_hash.nelems,
+ D_ERROR, lu_cdebug_printer);
lu_site_fini(&o->od_site);
dt_device_fini(&o->od_dt_dev);
OBD_FREE_PTR(o);
int rc;
ENTRY;
-
- osd_shutdown(env, o);
- osd_oi_fini(env, o);
-
+ osd_index_backup(env, o, false);
if (o->od_os) {
osd_objset_unregister_callbacks(o);
- osd_sync(env, lu2dt_dev(d));
- txg_wait_callbacks(spa_get_dsl(dmu_objset_spa(o->od_os)));
+ if (!o->od_dt_dev.dd_rdonly) {
+ osd_sync(env, lu2dt_dev(d));
+ txg_wait_callbacks(
+ spa_get_dsl(dmu_objset_spa(o->od_os)));
+ }
}
+ /* now with all the callbacks completed we can cleanup the remainings */
+ osd_shutdown(env, o);
+ osd_scrub_cleanup(env, o);
+
rc = osd_procfs_fini(o);
if (rc) {
CERROR("proc fini error %d\n", rc);
static int osd_process_config(const struct lu_env *env,
struct lu_device *d, struct lustre_cfg *cfg)
{
- struct osd_device *o = osd_dev(d);
- int rc;
- ENTRY;
+ struct osd_device *o = osd_dev(d);
+ ssize_t count;
+ int rc;
+ ENTRY;
switch(cfg->lcfg_command) {
case LCFG_SETUP:
rc = osd_mount(env, o, cfg);
break;
case LCFG_CLEANUP:
+ /* For the case LCFG_PRE_CLEANUP is not called in advance,
+ * that may happend if hit failure during mount process. */
+ osd_index_backup(env, o, false);
rc = osd_shutdown(env, o);
break;
case LCFG_PARAM: {
LASSERT(&o->od_dt_dev);
- rc = class_process_proc_param(PARAM_OSD, lprocfs_osd_obd_vars,
- cfg, &o->od_dt_dev);
- if (rc > 0 || rc == -ENOSYS)
- rc = class_process_proc_param(PARAM_OST,
- lprocfs_osd_obd_vars,
- cfg, &o->od_dt_dev);
+ count = class_modify_config(cfg, PARAM_OSD,
+ &o->od_dt_dev.dd_kobj);
+ if (count < 0)
+ count = class_modify_config(cfg, PARAM_OST,
+ &o->od_dt_dev.dd_kobj);
+ rc = count > 0 ? 0 : count;
break;
}
+ case LCFG_PRE_CLEANUP:
+ osd_scrub_stop(o);
+ osd_index_backup(env, o,
+ o->od_index_backup_policy != LIBP_NONE);
+ rc = 0;
+ break;
default:
rc = -ENOTTY;
}
int rc = 0;
ENTRY;
- if (osd->od_quota_slave == NULL)
+ if (osd->od_quota_slave_md == NULL && osd->od_quota_slave_dt == NULL)
RETURN(0);
/* start qsd instance on recovery completion, this notifies the quota
* slave code that we are about to process new requests now */
- rc = qsd_start(env, osd->od_quota_slave);
+ rc = qsd_start(env, osd->od_quota_slave_dt);
+ if (rc == 0 && osd->od_quota_slave_md != NULL)
+ rc = qsd_start(env, osd->od_quota_slave_md);
RETURN(rc);
}
static int osd_fid_init(const struct lu_env *env, struct osd_device *osd)
{
- struct seq_server_site *ss = osd_seq_site(osd);
- int rc;
+ struct seq_server_site *ss = osd_seq_site(osd);
+ int rc = 0;
ENTRY;
if (osd->od_is_ost || osd->od_cl_seq != NULL)
if (osd->od_cl_seq == NULL)
RETURN(-ENOMEM);
- rc = seq_client_init(osd->od_cl_seq, NULL, LUSTRE_SEQ_METADATA,
- osd->od_svname, ss->ss_server_seq);
+ seq_client_init(osd->od_cl_seq, NULL, LUSTRE_SEQ_METADATA,
+ osd->od_svname, ss->ss_server_seq);
- if (rc != 0) {
- OBD_FREE_PTR(osd->od_cl_seq);
- osd->od_cl_seq = NULL;
+ if (ss->ss_node_id == 0) {
+ /*
+ * If the OSD on the sequence controller(MDT0), then allocate
+ * sequence here, otherwise allocate sequence after connected
+ * to MDT0 (see mdt_register_lwp_callback()).
+ */
+ rc = seq_server_alloc_meta(osd->od_cl_seq->lcs_srv,
+ &osd->od_cl_seq->lcs_space, env);
}
RETURN(rc);
int rc = 0;
ENTRY;
- if (osd->od_quota_slave != NULL) {
+ if (osd->od_quota_slave_md != NULL) {
/* set up quota slave objects */
- rc = qsd_prepare(env, osd->od_quota_slave);
+ rc = qsd_prepare(env, osd->od_quota_slave_md);
+ if (rc != 0)
+ RETURN(rc);
+ }
+
+ if (osd->od_quota_slave_dt != NULL) {
+ /* set up quota slave objects */
+ rc = qsd_prepare(env, osd->od_quota_slave_dt);
if (rc != 0)
RETURN(rc);
}
RETURN(rc);
}
-struct lu_device_operations osd_lu_ops = {
+/**
+ * Implementation of lu_device_operations::ldo_fid_alloc() for OSD
+ *
+ * Allocate FID.
+ *
+ * see include/lu_object.h for the details.
+ */
+static int osd_fid_alloc(const struct lu_env *env, struct lu_device *d,
+ struct lu_fid *fid, struct lu_object *parent,
+ const struct lu_name *name)
+{
+ struct osd_device *osd = osd_dev(d);
+
+ return seq_client_alloc_fid(env, osd->od_cl_seq, fid);
+}
+
+const struct lu_device_operations osd_lu_ops = {
.ldo_object_alloc = osd_object_alloc,
.ldo_process_config = osd_process_config,
.ldo_recovery_complete = osd_recovery_complete,
.ldo_prepare = osd_prepare,
+ .ldo_fid_alloc = osd_fid_alloc,
};
static void osd_type_start(struct lu_device_type *t)
{
}
-int osd_fid_alloc(const struct lu_env *env, struct obd_export *exp,
- struct lu_fid *fid, struct md_op_data *op_data)
-{
- struct osd_device *osd = osd_dev(exp->exp_obd->obd_lu_dev);
-
- return seq_client_alloc_fid(env, osd->od_cl_seq, fid);
-}
-
-static struct lu_device_type_operations osd_device_type_ops = {
+static const struct lu_device_type_operations osd_device_type_ops = {
.ldto_init = osd_type_init,
.ldto_fini = osd_type_fini,
};
-static struct obd_ops osd_obd_device_ops = {
+static const struct obd_ops osd_obd_device_ops = {
.o_owner = THIS_MODULE,
.o_connect = osd_obd_connect,
.o_disconnect = osd_obd_disconnect,
- .o_fid_alloc = osd_fid_alloc
};
static int __init osd_init(void)
if (rc)
return rc;
- rc = class_register_type(&osd_obd_device_ops, NULL, true, NULL,
+ rc = class_register_type(&osd_obd_device_ops, NULL, true,
LUSTRE_OSD_ZFS_NAME, &osd_device_type);
if (rc)
lu_kmem_fini(osd_caches);
lu_kmem_fini(osd_caches);
}
-extern unsigned int osd_oi_count;
module_param(osd_oi_count, int, 0444);
MODULE_PARM_DESC(osd_oi_count, "Number of Object Index containers to be created, it's only valid for new filesystem.");
+module_param(osd_txg_sync_delay_us, int, 0644);
+MODULE_PARM_DESC(osd_txg_sync_delay_us,
+ "When zero or larger delay N usec instead of doing TXG sync");
+
MODULE_AUTHOR("OpenSFS, Inc. <http://www.lustre.org/>");
MODULE_DESCRIPTION("Lustre Object Storage Device ("LUSTRE_OSD_ZFS_NAME")");
MODULE_VERSION(LUSTRE_VERSION_STRING);