osd_dt_dev(th->th_dev)->od_svname, th, error);
}
- dt_txn_hook_commit(th);
-
/* call per-transaction callbacks if any */
list_for_each_entry_safe(dcb, tmp, &oh->ot_dcb_list, dcb_linkage) {
LASSERTF(dcb->dcb_magic == TRANS_COMMIT_CB_MAGIC,
* should be released. Quota space won't be adjusted at this point since
* we can't provide a suitable environment. It will be performed
* asynchronously by a lquota thread. */
- qsd_op_end(NULL, osd->od_quota_slave, &oh->ot_quota_trans);
+ qsd_op_end(NULL, osd->od_quota_slave_dt, &oh->ot_quota_trans);
+ if (osd->od_quota_slave_md != NULL)
+ qsd_op_end(NULL, osd->od_quota_slave_md, &oh->ot_quota_trans);
lu_device_put(lud);
th->th_dev = NULL;
static int osd_trans_start(const struct lu_env *env, struct dt_device *d,
struct thandle *th)
{
- struct osd_thandle *oh;
- int rc;
+ struct osd_device *osd = osd_dt_dev(d);
+ struct osd_thandle *oh;
+ int rc;
+
ENTRY;
oh = container_of0(th, struct osd_thandle, ot_super);
LASSERT(oh->ot_tx);
rc = dt_txn_hook_start(env, d, th);
- if (rc != 0)
+ if (rc != 0) {
+ CERROR("%s: dt_txn_hook_start failed: rc = %d\n",
+ osd->od_svname, rc);
RETURN(rc);
+ }
- if (oh->ot_write_commit && OBD_FAIL_CHECK(OBD_FAIL_OST_MAPBLK_ENOSPC))
+ if (OBD_FAIL_CHECK(OBD_FAIL_OSD_TXN_START))
/* Unlike ldiskfs, ZFS checks for available space and returns
* -ENOSPC when assigning txg */
- RETURN(-ENOSPC);
+ RETURN(-EIO);
rc = -dmu_tx_assign(oh->ot_tx, TXG_WAIT);
if (unlikely(rc != 0)) {
- struct osd_device *osd = osd_dt_dev(d);
/* dmu will call commit callback with error code during abort */
if (!lu_device_is_md(&d->dd_lu_dev) && rc == -ENOSPC)
CERROR("%s: failed to start transaction due to ENOSPC"
struct osd_device *osd = osd_dt_dev(th->th_dev);
bool sync = (th->th_sync != 0);
struct osd_thandle *oh;
- struct list_head unlinked;
+ LIST_HEAD(unlinked);
uint64_t txg;
int rc;
ENTRY;
oh = container_of0(th, struct osd_thandle, ot_super);
- INIT_LIST_HEAD(&unlinked);
list_splice_init(&oh->ot_unlinked_list, &unlinked);
+
+ osd_oti_get(env)->oti_ins_cache_depth--;
/* reset OI cache for safety */
- osd_oti_get(env)->oti_ins_cache_used = 0;
+ if (osd_oti_get(env)->oti_ins_cache_depth == 0)
+ osd_oti_get(env)->oti_ins_cache_used = 0;
if (oh->ot_assigned == 0) {
LASSERT(oh->ot_tx);
+ CDEBUG(D_OTHER, "%s: transaction is aborted\n", osd->od_svname);
+ osd_trans_stop_cb(oh, th->th_result);
dmu_tx_abort(oh->ot_tx);
osd_object_sa_dirty_rele(env, oh);
osd_unlinked_list_emptify(env, osd, &unlinked, false);
/* there won't be any commit, release reserved quota space now,
* if any */
- qsd_op_end(env, osd->od_quota_slave, &oh->ot_quota_trans);
+ qsd_op_end(env, osd->od_quota_slave_dt, &oh->ot_quota_trans);
+ if (osd->od_quota_slave_md != NULL)
+ qsd_op_end(env, osd->od_quota_slave_md,
+ &oh->ot_quota_trans);
OBD_FREE_PTR(oh);
RETURN(0);
}
th = &oh->ot_super;
th->th_dev = dt;
th->th_result = 0;
+
+ osd_oti_get(env)->oti_ins_cache_depth++;
+
RETURN(th);
}
* gradually disappears as the number of real dnodes grows. It also
* avoids the need to check for divide-by-zero computing dn_per_block.
*/
- CLASSERT(OSD_DNODE_MIN_BLKSHIFT > 0);
- CLASSERT(OSD_DNODE_EST_BLKSHIFT > 0);
+ BUILD_BUG_ON(OSD_DNODE_MIN_BLKSHIFT <= 0);
+ BUILD_BUG_ON(OSD_DNODE_EST_BLKSHIFT <= 0);
est_usedblocks = ((OSD_DNODE_EST_COUNT << OSD_DNODE_EST_BLKSHIFT) +
usedbytes) >> est_maxblockshift;
* Reserve 0.78% of total space, at least 16MB for small filesystems,
* for internal files to be created/unlinked when space is tight.
*/
- CLASSERT(OSD_STATFS_RESERVED_SIZE > 0);
+ BUILD_BUG_ON(OSD_STATFS_RESERVED_SIZE <= 0);
reserved = OSD_STATFS_RESERVED_SIZE >> bshift;
if (likely(osfs->os_blocks >= reserved << OSD_STATFS_RESERVED_SHIFT))
reserved = osfs->os_blocks >> OSD_STATFS_RESERVED_SHIFT;
* Concurrency: shouldn't matter.
*/
int osd_statfs(const struct lu_env *env, struct dt_device *d,
- struct obd_statfs *osfs)
+ struct obd_statfs *osfs, struct obd_statfs_info *info)
{
- int rc;
+ struct osd_device *osd = osd_dt_dev(d);
+ int rc;
ENTRY;
- rc = osd_objset_statfs(osd_dt_dev(d), osfs);
+ rc = osd_objset_statfs(osd, osfs);
if (unlikely(rc != 0))
RETURN(rc);
osfs->os_bavail -= min_t(u64,
OSD_GRANT_FOR_LOCAL_OIDS / osfs->os_bsize,
osfs->os_bavail);
+
+ /* ZFS does not support reporting nonrotional status yet, so return
+ * flag only if user has set nonrotational.
+ */
+ osfs->os_state |= osd->od_nonrotational ? OS_STATE_NONROT : 0;
+
RETURN(0);
}
/* nr_blkptrshift is the log2 of the number of block pointers that can
* be stored in an indirect block */
- CLASSERT(DN_MAX_INDBLKSHIFT > SPA_BLKPTRSHIFT);
+ BUILD_BUG_ON(DN_MAX_INDBLKSHIFT <= SPA_BLKPTRSHIFT);
nr_blkptrshift = DN_MAX_INDBLKSHIFT - SPA_BLKPTRSHIFT;
/* max_blockshift / nr_blkptrshift is thus the maximum depth of the
param->ddp_mntopts = MNTOPT_USERXATTR;
if (osd->od_posix_acl)
param->ddp_mntopts |= MNTOPT_ACL;
- param->ddp_max_ea_size = DXATTR_MAX_ENTRY_SIZE;
+ /* Previously DXATTR_MAX_ENTRY_SIZE */
+ param->ddp_max_ea_size = OBD_MAX_EA_SIZE;
/* for maxbytes, report same value as ZPL */
param->ddp_maxbytes = MAX_LFS_FILESIZE;
ENTRY;
/* shutdown quota slave instance associated with the device */
- if (o->od_quota_slave != NULL) {
+ if (o->od_quota_slave_md != NULL) {
/* complete all in-flight callbacks */
osd_sync(env, &o->od_dt_dev);
txg_wait_callbacks(spa_get_dsl(dmu_objset_spa(o->od_os)));
- qsd_fini(env, o->od_quota_slave);
- o->od_quota_slave = NULL;
+ qsd_fini(env, o->od_quota_slave_md);
+ o->od_quota_slave_md = NULL;
}
+ if (o->od_quota_slave_dt != NULL) {
+ /* complete all in-flight callbacks */
+ osd_sync(env, &o->od_dt_dev);
+ txg_wait_callbacks(spa_get_dsl(dmu_objset_spa(o->od_os)));
+ qsd_fini(env, o->od_quota_slave_dt);
+ o->od_quota_slave_dt = NULL;
+ }
osd_fid_fini(env, o);
RETURN(0);
rc = -osd_dmu_objset_own(o->od_mntdev, DMU_OST_ZFS,
o->od_dt_dev.dd_rdonly ? B_TRUE : B_FALSE,
- B_FALSE, o, &o->od_os);
+ B_TRUE, o, &o->od_os);
if (rc) {
CERROR("%s: can't open %s\n", o->od_svname, o->od_mntdev);
out:
if (rc != 0 && o->od_os != NULL) {
- osd_dmu_objset_disown(o->od_os, B_FALSE, o);
+ osd_dmu_objset_disown(o->od_os, B_TRUE, o);
o->od_os = NULL;
}
}
tx = dmu_tx_create(osd->od_os);
+ dmu_tx_mark_netfree(tx);
dmu_tx_hold_free(tx, oid, 0, DMU_OBJECT_END);
osd_tx_hold_zap(tx, osd->od_unlinked->dn_object, osd->od_unlinked,
FALSE, NULL);
o->od_xattr_in_sa = B_TRUE;
o->od_max_blksz = osd_spa_maxblocksize(o->od_os->os_spa);
+ o->od_readcache_max_filesize = OSD_MAX_CACHE_SIZE;
rc = __osd_obj2dnode(o->od_os, o->od_rootid, &rootdn);
if (rc)
if (rc)
GOTO(err, rc);
- /* initialize quota slave instance */
- o->od_quota_slave = qsd_init(env, o->od_svname, &o->od_dt_dev,
- o->od_proc_entry);
- if (IS_ERR(o->od_quota_slave)) {
- rc = PTR_ERR(o->od_quota_slave);
- o->od_quota_slave = NULL;
+ /* currently it's no need to prepare qsd_instance_md for OST */
+ if (!o->od_is_ost) {
+ o->od_quota_slave_md = qsd_init(env, o->od_svname,
+ &o->od_dt_dev,
+ o->od_proc_entry, true);
+ if (IS_ERR(o->od_quota_slave_md)) {
+ rc = PTR_ERR(o->od_quota_slave_md);
+ o->od_quota_slave_md = NULL;
+ GOTO(err, rc);
+ }
+ }
+
+ o->od_quota_slave_dt = qsd_init(env, o->od_svname, &o->od_dt_dev,
+ o->od_proc_entry, false);
+
+ if (IS_ERR(o->od_quota_slave_dt)) {
+ if (o->od_quota_slave_md != NULL) {
+ qsd_fini(env, o->od_quota_slave_md);
+ o->od_quota_slave_md = NULL;
+ }
+
+ rc = PTR_ERR(o->od_quota_slave_dt);
+ o->od_quota_slave_dt = NULL;
GOTO(err, rc);
}
osd_unlinked_drain(env, o);
err:
if (rc && o->od_os) {
- osd_dmu_objset_disown(o->od_os, B_FALSE, o);
+ osd_dmu_objset_disown(o->od_os, B_TRUE, o);
o->od_os = NULL;
}
txg_wait_synced(dmu_objset_pool(o->od_os), 0ULL);
/* close the object set */
- osd_dmu_objset_disown(o->od_os, B_FALSE, o);
+ osd_dmu_objset_disown(o->od_os, B_TRUE, o);
o->od_os = NULL;
}
INIT_LIST_HEAD(&o->od_ios_list);
o->od_auto_scrub_interval = AS_DEFAULT;
+ /* ZFS does not support reporting nonrotional status yet, so this flag
+ * is only set if explicitly set by the user.
+ */
+ o->od_nonrotational = 0;
+
out:
RETURN(rc);
}
static int osd_process_config(const struct lu_env *env,
struct lu_device *d, struct lustre_cfg *cfg)
{
- struct osd_device *o = osd_dev(d);
- int rc;
- ENTRY;
+ struct osd_device *o = osd_dev(d);
+ ssize_t count;
+ int rc;
+ ENTRY;
switch(cfg->lcfg_command) {
case LCFG_SETUP:
rc = osd_mount(env, o, cfg);
break;
case LCFG_PARAM: {
LASSERT(&o->od_dt_dev);
- rc = class_process_proc_param(PARAM_OSD, lprocfs_osd_obd_vars,
- cfg, &o->od_dt_dev);
- if (rc > 0 || rc == -ENOSYS) {
- rc = class_process_proc_param(PARAM_OST,
- lprocfs_osd_obd_vars,
- cfg, &o->od_dt_dev);
- if (rc > 0)
- rc = 0;
- }
+ count = class_modify_config(cfg, PARAM_OSD,
+ &o->od_dt_dev.dd_kobj);
+ if (count < 0)
+ count = class_modify_config(cfg, PARAM_OST,
+ &o->od_dt_dev.dd_kobj);
+ rc = count > 0 ? 0 : count;
break;
}
case LCFG_PRE_CLEANUP:
int rc = 0;
ENTRY;
- if (osd->od_quota_slave == NULL)
+ if (osd->od_quota_slave_md == NULL && osd->od_quota_slave_dt == NULL)
RETURN(0);
/* start qsd instance on recovery completion, this notifies the quota
* slave code that we are about to process new requests now */
- rc = qsd_start(env, osd->od_quota_slave);
+ rc = qsd_start(env, osd->od_quota_slave_dt);
+ if (rc == 0 && osd->od_quota_slave_md != NULL)
+ rc = qsd_start(env, osd->od_quota_slave_md);
RETURN(rc);
}
osd->od_cl_seq = NULL;
}
+ if (ss->ss_node_id == 0) {
+ /*
+ * If the OSD on the sequence controller(MDT0), then allocate
+ * sequence here, otherwise allocate sequence after connected
+ * to MDT0 (see mdt_register_lwp_callback()).
+ */
+ rc = seq_server_alloc_meta(osd->od_cl_seq->lcs_srv,
+ &osd->od_cl_seq->lcs_space, env);
+ }
+
RETURN(rc);
}
int rc = 0;
ENTRY;
- if (osd->od_quota_slave != NULL) {
+ if (osd->od_quota_slave_md != NULL) {
+ /* set up quota slave objects */
+ rc = qsd_prepare(env, osd->od_quota_slave_md);
+ if (rc != 0)
+ RETURN(rc);
+ }
+
+ if (osd->od_quota_slave_dt != NULL) {
/* set up quota slave objects */
- rc = qsd_prepare(env, osd->od_quota_slave);
+ rc = qsd_prepare(env, osd->od_quota_slave_dt);
if (rc != 0)
RETURN(rc);
}
};
-static struct obd_ops osd_obd_device_ops = {
+static const struct obd_ops osd_obd_device_ops = {
.o_owner = THIS_MODULE,
.o_connect = osd_obd_connect,
.o_disconnect = osd_obd_disconnect,