* Copyright (c) 2009, 2010, Oracle and/or its affiliates. All rights reserved.
* Use is subject to license terms.
*
- * Copyright (c) 2012, 2016, Intel Corporation.
+ * Copyright (c) 2012, 2017, Intel Corporation.
*/
/*
* This file is part of Lustre, http://www.lustre.org/
struct lu_context_key osd_key;
+static int osd_txg_sync_delay_us = -1;
+
/* Slab for OSD object allocation */
struct kmem_cache *osd_object_kmem;
* should be released. Quota space won't be adjusted at this point since
* we can't provide a suitable environment. It will be performed
* asynchronously by a lquota thread. */
- qsd_op_end(NULL, osd->od_quota_slave, &oh->ot_quota_trans);
+ qsd_op_end(NULL, osd->od_quota_slave_dt, &oh->ot_quota_trans);
+ if (osd->od_quota_slave_md != NULL)
+ qsd_op_end(NULL, osd->od_quota_slave_md, &oh->ot_quota_trans);
lu_device_put(lud);
th->th_dev = NULL;
static int osd_trans_start(const struct lu_env *env, struct dt_device *d,
struct thandle *th)
{
- struct osd_thandle *oh;
- int rc;
+ struct osd_device *osd = osd_dt_dev(d);
+ struct osd_thandle *oh;
+ int rc;
+
ENTRY;
oh = container_of0(th, struct osd_thandle, ot_super);
LASSERT(oh->ot_tx);
rc = dt_txn_hook_start(env, d, th);
- if (rc != 0)
+ if (rc != 0) {
+ CERROR("%s: dt_txn_hook_start failed: rc = %d\n",
+ osd->od_svname, rc);
RETURN(rc);
+ }
- if (oh->ot_write_commit && OBD_FAIL_CHECK(OBD_FAIL_OST_MAPBLK_ENOSPC))
+ if (OBD_FAIL_CHECK(OBD_FAIL_OSD_TXN_START))
/* Unlike ldiskfs, ZFS checks for available space and returns
* -ENOSPC when assigning txg */
- RETURN(-ENOSPC);
+ RETURN(-EIO);
rc = -dmu_tx_assign(oh->ot_tx, TXG_WAIT);
if (unlikely(rc != 0)) {
- struct osd_device *osd = osd_dt_dev(d);
/* dmu will call commit callback with error code during abort */
if (!lu_device_is_md(&d->dd_lu_dev) && rc == -ENOSPC)
CERROR("%s: failed to start transaction due to ENOSPC"
/* add commit callback */
dmu_tx_callback_register(oh->ot_tx, osd_trans_commit_cb, oh);
oh->ot_assigned = 1;
+ osd_oti_get(env)->oti_in_trans = 1;
lu_device_get(&d->dd_lu_dev);
}
oh = container_of0(th, struct osd_thandle, ot_super);
INIT_LIST_HEAD(&unlinked);
list_splice_init(&oh->ot_unlinked_list, &unlinked);
+
+ osd_oti_get(env)->oti_ins_cache_depth--;
/* reset OI cache for safety */
- osd_oti_get(env)->oti_ins_cache_used = 0;
+ if (osd_oti_get(env)->oti_ins_cache_depth == 0)
+ osd_oti_get(env)->oti_ins_cache_used = 0;
if (oh->ot_assigned == 0) {
LASSERT(oh->ot_tx);
+ CDEBUG(D_OTHER, "%s: transaction is aborted\n", osd->od_svname);
+ osd_trans_stop_cb(oh, th->th_result);
dmu_tx_abort(oh->ot_tx);
osd_object_sa_dirty_rele(env, oh);
osd_unlinked_list_emptify(env, osd, &unlinked, false);
/* there won't be any commit, release reserved quota space now,
* if any */
- qsd_op_end(env, osd->od_quota_slave, &oh->ot_quota_trans);
+ qsd_op_end(env, osd->od_quota_slave_dt, &oh->ot_quota_trans);
+ if (osd->od_quota_slave_md != NULL)
+ qsd_op_end(env, osd->od_quota_slave_md,
+ &oh->ot_quota_trans);
OBD_FREE_PTR(oh);
RETURN(0);
}
/* XXX: Once dmu_tx_commit() called, oh/th could have been freed
* by osd_trans_commit_cb already. */
dmu_tx_commit(oh->ot_tx);
+ osd_oti_get(env)->oti_in_trans = 0;
osd_unlinked_list_emptify(env, osd, &unlinked, true);
- if (sync)
- txg_wait_synced(dmu_objset_pool(osd->od_os), txg);
+ if (sync) {
+ if (osd_txg_sync_delay_us < 0)
+ txg_wait_synced(dmu_objset_pool(osd->od_os), txg);
+ else
+ udelay(osd_txg_sync_delay_us);
+ }
RETURN(rc);
}
th = &oh->ot_super;
th->th_dev = dt;
th->th_result = 0;
+
+ osd_oti_get(env)->oti_ins_cache_depth++;
+
RETURN(th);
}
param->ddp_max_extent_blks =
(1 << (DN_MAX_INDBLKSHIFT - SPA_BLKPTRSHIFT));
param->ddp_extent_tax = osd_blk_insert_cost(osd);
+
+ /* Preferred RPC size for efficient disk IO. 1MB shows good
+ * all-around performance for ZFS, but use blocksize (recordsize)
+ * by default if larger to avoid read-modify-write. */
+ if (osd->od_max_blksz > ONE_MB_BRW_SIZE)
+ param->ddp_brw_size = osd->od_max_blksz;
+ else
+ param->ddp_brw_size = ONE_MB_BRW_SIZE;
}
/*
ENTRY;
/* shutdown quota slave instance associated with the device */
- if (o->od_quota_slave != NULL) {
+ if (o->od_quota_slave_md != NULL) {
/* complete all in-flight callbacks */
osd_sync(env, &o->od_dt_dev);
txg_wait_callbacks(spa_get_dsl(dmu_objset_spa(o->od_os)));
- qsd_fini(env, o->od_quota_slave);
- o->od_quota_slave = NULL;
+ qsd_fini(env, o->od_quota_slave_md);
+ o->od_quota_slave_md = NULL;
}
+ if (o->od_quota_slave_dt != NULL) {
+ /* complete all in-flight callbacks */
+ osd_sync(env, &o->od_dt_dev);
+ txg_wait_callbacks(spa_get_dsl(dmu_objset_spa(o->od_os)));
+ qsd_fini(env, o->od_quota_slave_dt);
+ o->od_quota_slave_dt = NULL;
+ }
osd_fid_fini(env, o);
RETURN(0);
osd->od_prop_rdonly = !!newval;
}
+#ifdef HAVE_DMU_OBJECT_ALLOC_DNSIZE
static void osd_dnodesize_changed_cb(void *arg, uint64_t newval)
{
struct osd_device *osd = arg;
osd->od_dnsize = newval;
}
-
+#endif
/*
* This function unregisters all registered callbacks. It's harmless to
* unregister callbacks that were never registered so it is used to safely
osd_recordsize_changed_cb, o);
(void) dsl_prop_unregister(ds, zfs_prop_to_name(ZFS_PROP_READONLY),
osd_readonly_changed_cb, o);
+#ifdef HAVE_DMU_OBJECT_ALLOC_DNSIZE
(void) dsl_prop_unregister(ds, zfs_prop_to_name(ZFS_PROP_DNODESIZE),
- osd_readonly_changed_cb, o);
+ osd_dnodesize_changed_cb, o);
+#endif
if (o->arc_prune_cb != NULL) {
arc_remove_prune_callback(o->arc_prune_cb);
if (rc)
GOTO(err, rc);
+#ifdef HAVE_DMU_OBJECT_ALLOC_DNSIZE
rc = -dsl_prop_register(ds, zfs_prop_to_name(ZFS_PROP_DNODESIZE),
osd_dnodesize_changed_cb, o);
if (rc)
GOTO(err, rc);
+#endif
o->arc_prune_cb = arc_add_prune_callback(arc_prune_func, o);
err:
}
tx = dmu_tx_create(osd->od_os);
+ dmu_tx_mark_netfree(tx);
dmu_tx_hold_free(tx, oid, 0, DMU_OBJECT_END);
osd_tx_hold_zap(tx, osd->od_unlinked->dn_object, osd->od_unlinked,
FALSE, NULL);
if (rc >= sizeof(o->od_svname))
RETURN(-E2BIG);
+ o->od_index_backup_stop = 0;
+ o->od_index = -1; /* -1 means index is invalid */
+ rc = server_name2index(o->od_svname, &o->od_index, NULL);
str = strstr(str, ":");
if (str) {
unsigned long flags;
LCONSOLE_WARN("%s: set dev_rdonly on this device\n",
svname);
}
+
+ if (flags & LMD_FLG_NOSCRUB)
+ o->od_auto_scrub_interval = AS_NEVER;
}
if (server_name_is_ost(o->od_svname))
o->od_xattr_in_sa = B_TRUE;
o->od_max_blksz = osd_spa_maxblocksize(o->od_os->os_spa);
+ o->od_readcache_max_filesize = OSD_MAX_CACHE_SIZE;
rc = __osd_obj2dnode(o->od_os, o->od_rootid, &rootdn);
if (rc)
}
#endif
- /* 1. initialize oi before any file create or file open */
- rc = osd_oi_init(env, o);
- if (rc)
- GOTO(err, rc);
-
rc = lu_site_init(&o->od_site, osd2lu_dev(o));
if (rc)
GOTO(err, rc);
if (rc)
GOTO(err, rc);
+ o->od_in_init = 1;
+ rc = osd_scrub_setup(env, o);
+ o->od_in_init = 0;
+ if (rc)
+ GOTO(err, rc);
+
rc = osd_procfs_init(o, o->od_svname);
if (rc)
GOTO(err, rc);
- /* initialize quota slave instance */
- o->od_quota_slave = qsd_init(env, o->od_svname, &o->od_dt_dev,
- o->od_proc_entry);
- if (IS_ERR(o->od_quota_slave)) {
- rc = PTR_ERR(o->od_quota_slave);
- o->od_quota_slave = NULL;
+ /* currently it's no need to prepare qsd_instance_md for OST */
+ if (!o->od_is_ost) {
+ o->od_quota_slave_md = qsd_init(env, o->od_svname,
+ &o->od_dt_dev,
+ o->od_proc_entry, true);
+ if (IS_ERR(o->od_quota_slave_md)) {
+ rc = PTR_ERR(o->od_quota_slave_md);
+ o->od_quota_slave_md = NULL;
+ GOTO(err, rc);
+ }
+ }
+
+ o->od_quota_slave_dt = qsd_init(env, o->od_svname, &o->od_dt_dev,
+ o->od_proc_entry, false);
+
+ if (IS_ERR(o->od_quota_slave_dt)) {
+ if (o->od_quota_slave_md != NULL) {
+ qsd_fini(env, o->od_quota_slave_md);
+ o->od_quota_slave_md = NULL;
+ }
+
+ rc = PTR_ERR(o->od_quota_slave_dt);
+ o->od_quota_slave_dt = NULL;
GOTO(err, rc);
}
l->ld_ops = &osd_lu_ops;
o->od_dt_dev.dd_ops = &osd_dt_ops;
+ sema_init(&o->od_otable_sem, 1);
+ INIT_LIST_HEAD(&o->od_ios_list);
+ o->od_auto_scrub_interval = AS_DEFAULT;
out:
RETURN(rc);
INIT_LIST_HEAD(&osl->osl_seq_list);
rwlock_init(&osl->osl_seq_list_lock);
sema_init(&osl->osl_seq_init_sem, 1);
+ INIT_LIST_HEAD(&dev->od_index_backup_list);
+ INIT_LIST_HEAD(&dev->od_index_restore_list);
+ spin_lock_init(&dev->od_lock);
+ dev->od_index_backup_policy = LIBP_NONE;
rc = dt_device_init(&dev->od_dt_dev, type);
if (rc == 0) {
/* now with all the callbacks completed we can cleanup the remainings */
osd_shutdown(env, o);
- osd_oi_fini(env, o);
+ osd_scrub_cleanup(env, o);
rc = osd_procfs_fini(o);
if (rc) {
rc = osd_mount(env, o, cfg);
break;
case LCFG_CLEANUP:
+ /* For the case LCFG_PRE_CLEANUP is not called in advance,
+ * that may happend if hit failure during mount process. */
+ osd_index_backup(env, o, false);
rc = osd_shutdown(env, o);
break;
case LCFG_PARAM: {
}
break;
}
+ case LCFG_PRE_CLEANUP:
+ osd_scrub_stop(o);
+ osd_index_backup(env, o,
+ o->od_index_backup_policy != LIBP_NONE);
+ rc = 0;
+ break;
default:
rc = -ENOTTY;
}
int rc = 0;
ENTRY;
- if (osd->od_quota_slave == NULL)
+ if (osd->od_quota_slave_md == NULL && osd->od_quota_slave_dt == NULL)
RETURN(0);
/* start qsd instance on recovery completion, this notifies the quota
* slave code that we are about to process new requests now */
- rc = qsd_start(env, osd->od_quota_slave);
+ rc = qsd_start(env, osd->od_quota_slave_dt);
+ if (rc == 0 && osd->od_quota_slave_md != NULL)
+ rc = qsd_start(env, osd->od_quota_slave_md);
RETURN(rc);
}
osd->od_cl_seq = NULL;
}
+ if (ss->ss_node_id == 0) {
+ /*
+ * If the OSD on the sequence controller(MDT0), then allocate
+ * sequence here, otherwise allocate sequence after connected
+ * to MDT0 (see mdt_register_lwp_callback()).
+ */
+ rc = seq_server_alloc_meta(osd->od_cl_seq->lcs_srv,
+ &osd->od_cl_seq->lcs_space, env);
+ }
+
RETURN(rc);
}
int rc = 0;
ENTRY;
- if (osd->od_quota_slave != NULL) {
+ if (osd->od_quota_slave_md != NULL) {
/* set up quota slave objects */
- rc = qsd_prepare(env, osd->od_quota_slave);
+ rc = qsd_prepare(env, osd->od_quota_slave_md);
+ if (rc != 0)
+ RETURN(rc);
+ }
+
+ if (osd->od_quota_slave_dt != NULL) {
+ /* set up quota slave objects */
+ rc = qsd_prepare(env, osd->od_quota_slave_dt);
if (rc != 0)
RETURN(rc);
}
lu_kmem_fini(osd_caches);
}
-extern unsigned int osd_oi_count;
module_param(osd_oi_count, int, 0444);
MODULE_PARM_DESC(osd_oi_count, "Number of Object Index containers to be created, it's only valid for new filesystem.");
+module_param(osd_txg_sync_delay_us, int, 0644);
+MODULE_PARM_DESC(osd_txg_sync_delay_us,
+ "When zero or larger delay N usec instead of doing TXG sync");
+
MODULE_AUTHOR("OpenSFS, Inc. <http://www.lustre.org/>");
MODULE_DESCRIPTION("Lustre Object Storage Device ("LUSTRE_OSD_ZFS_NAME")");
MODULE_VERSION(LUSTRE_VERSION_STRING);