* Use is subject to license terms.
*/
/*
- * Copyright (c) 2011, 2012 Whamcloud, Inc.
+ * Copyright (c) 2012, 2013, Intel Corporation.
* Use is subject to license terms.
*
*/
* Author: Johann Lombardi <johann@whamcloud.com>
*/
-#ifndef EXPORT_SYMTAB
-# define EXPORT_SYMTAB
-#endif
#define DEBUG_SUBSYSTEM S_OSD
#include <lustre_ver.h>
#include <obd_class.h>
#include <lustre_disk.h>
#include <lustre_fid.h>
+#include <lustre_param.h>
+#include <md_object.h>
#include "osd_internal.h"
static char *root_tag = "osd_mount, rootdb";
/* Slab for OSD object allocation */
-cfs_mem_cache_t *osd_object_kmem;
+struct kmem_cache *osd_object_kmem;
static struct lu_kmem_descr osd_caches[] = {
{
dt_txn_hook_commit(th);
/* call per-transaction callbacks if any */
- cfs_list_for_each_entry_safe(dcb, tmp, &oh->ot_dcb_list, dcb_linkage)
+ list_for_each_entry_safe(dcb, tmp, &oh->ot_dcb_list, dcb_linkage)
dcb->dcb_func(NULL, th, dcb, error);
/* Unlike ldiskfs, zfs updates space accounting at commit time.
th->th_dev = NULL;
lu_context_exit(&th->th_ctx);
lu_context_fini(&th->th_ctx);
- OBD_FREE_PTR(oh);
+ thandle_put(&oh->ot_super);
EXIT;
}
struct osd_thandle *oh;
oh = container_of0(th, struct osd_thandle, ot_super);
- cfs_list_add(&dcb->dcb_linkage, &oh->ot_dcb_list);
+ list_add(&dcb->dcb_linkage, &oh->ot_dcb_list);
return 0;
}
if (!lu_device_is_md(&d->dd_lu_dev) && rc == -ENOSPC)
CERROR("%s: failed to start transaction due to ENOSPC. "
"Metadata overhead is underestimated or "
- "grant_ratio is too low.\n",
- osd->od_dt_dev.dd_lu_dev.ld_obd->obd_name);
+ "grant_ratio is too low.\n", osd->od_svname);
else
CERROR("%s: can't assign tx: rc = %d\n",
- osd->od_dt_dev.dd_lu_dev.ld_obd->obd_name, rc);
+ osd->od_svname, rc);
} else {
/* add commit callback */
dmu_tx_callback_register(oh->ot_tx, osd_trans_commit_cb, oh);
/*
* Concurrency: shouldn't matter.
*/
-static int osd_trans_stop(const struct lu_env *env, struct thandle *th)
+static int osd_trans_stop(const struct lu_env *env, struct dt_device *dt,
+ struct thandle *th)
{
struct osd_device *osd = osd_dt_dev(th->th_dev);
struct osd_thandle *oh;
/* there won't be any commit, release reserved quota space now,
* if any */
qsd_op_end(env, osd->od_quota_slave, &oh->ot_quota_trans);
- OBD_FREE_PTR(oh);
+ thandle_put(&oh->ot_super);
RETURN(0);
}
}
oh->ot_tx = tx;
- CFS_INIT_LIST_HEAD(&oh->ot_dcb_list);
- CFS_INIT_LIST_HEAD(&oh->ot_sa_list);
- cfs_sema_init(&oh->ot_sa_lock, 1);
+ INIT_LIST_HEAD(&oh->ot_dcb_list);
+ INIT_LIST_HEAD(&oh->ot_sa_list);
+ sema_init(&oh->ot_sa_lock, 1);
memset(&oh->ot_quota_trans, 0, sizeof(oh->ot_quota_trans));
th = &oh->ot_super;
th->th_dev = dt;
th->th_result = 0;
th->th_tags = LCT_TX_HANDLE;
+ atomic_set(&th->th_refc, 1);
+ th->th_alloc_size = sizeof(*oh);
RETURN(th);
}
ENTRY;
rc = udmu_objset_statfs(&osd->od_objset, osfs);
- if (rc)
+ if (unlikely(rc))
RETURN(rc);
osfs->os_bavail -= min_t(obd_size,
OSD_GRANT_FOR_LOCAL_OIDS / osfs->os_bsize,
const struct dt_device *dev,
struct dt_device_param *param)
{
+ struct osd_device *osd = osd_dt_dev(dev);
+
/*
* XXX should be taken from not-yet-existing fs abstraction layer.
*/
- param->ddp_max_name_len = MAXNAMELEN;
- param->ddp_max_nlink = 1 << 31; /* it's 8byte on a disk */
- param->ddp_block_shift = 12; /* XXX */
- param->ddp_mount_type = LDD_MT_ZFS;
+ param->ddp_max_name_len = MAXNAMELEN;
+ param->ddp_max_nlink = 1 << 31; /* it's 8byte on a disk */
+ param->ddp_block_shift = 12; /* XXX */
+ param->ddp_mount_type = LDD_MT_ZFS;
- param->ddp_mntopts = MNTOPT_USERXATTR | MNTOPT_ACL;
- param->ddp_max_ea_size = DXATTR_MAX_ENTRY_SIZE;
+ param->ddp_mntopts = MNTOPT_USERXATTR;
+ if (osd->od_posix_acl)
+ param->ddp_mntopts |= MNTOPT_ACL;
+ param->ddp_max_ea_size = DXATTR_MAX_ENTRY_SIZE;
/* for maxbytes, report same value as ZPL */
- param->ddp_maxbytes = MAX_LFS_FILESIZE;
+ param->ddp_maxbytes = MAX_LFS_FILESIZE;
/* Default reserved fraction of the available space that should be kept
* for error margin. Unfortunately, there are many factors that can
param->ddp_inodespace = OSD_DNODE_EST_COUNT;
/* per-fragment overhead to be used by the client code */
param->ddp_grant_frag = udmu_blk_insert_cost();
-
- param->ddp_mnt = NULL;
}
/*
tx_state_t *tx = &dmu_objset_pool(osd->od_objset.os)->dp_tx;
uint64_t txg;
+ mutex_enter(&tx->tx_sync_lock);
txg = tx->tx_open_txg + 1;
if (tx->tx_quiesce_txg_waiting < txg) {
tx->tx_quiesce_txg_waiting = txg;
RETURN(0);
}
+static void osd_xattr_changed_cb(void *arg, uint64_t newval)
+{
+ struct osd_device *osd = arg;
+
+ osd->od_xattr_in_sa = (newval == ZFS_XATTR_SA);
+}
+
static int osd_mount(const struct lu_env *env,
struct osd_device *o, struct lustre_cfg *cfg)
{
- char *dev = lustre_cfg_string(cfg, 1);
- dmu_buf_t *rootdb;
- int rc;
+ struct dsl_dataset *ds;
+ char *mntdev = lustre_cfg_string(cfg, 1);
+ char *svname = lustre_cfg_string(cfg, 4);
+ dmu_buf_t *rootdb;
+ dsl_pool_t *dp;
+ const char *opts;
+ int rc;
ENTRY;
if (o->od_objset.os != NULL)
RETURN(0);
- if (strlen(dev) >= sizeof(o->od_mntdev))
+ if (mntdev == NULL || svname == NULL)
+ RETURN(-EINVAL);
+
+ rc = strlcpy(o->od_mntdev, mntdev, sizeof(o->od_mntdev));
+ if (rc >= sizeof(o->od_mntdev))
RETURN(-E2BIG);
- strcpy(o->od_mntdev, dev);
- strncpy(o->od_svname, lustre_cfg_string(cfg, 4),
- sizeof(o->od_svname) - 1);
+ rc = strlcpy(o->od_svname, svname, sizeof(o->od_svname));
+ if (rc >= sizeof(o->od_svname))
+ RETURN(-E2BIG);
+
+ if (server_name_is_ost(o->od_svname))
+ o->od_is_ost = 1;
rc = -udmu_objset_open(o->od_mntdev, &o->od_objset);
if (rc) {
RETURN(rc);
}
+ ds = dmu_objset_ds(o->od_objset.os);
+ dp = dmu_objset_pool(o->od_objset.os);
+ LASSERT(ds);
+ LASSERT(dp);
+ dsl_pool_config_enter(dp, FTAG);
+ rc = dsl_prop_register(ds, "xattr", osd_xattr_changed_cb, o);
+ dsl_pool_config_exit(dp, FTAG);
+ if (rc)
+ CERROR("%s: cat not register xattr callback, ignore: %d\n",
+ o->od_svname, rc);
+
rc = __osd_obj2dbuf(env, o->od_objset.os, o->od_objset.root,
&rootdb, root_tag);
if (rc) {
if (rc)
GOTO(err, rc);
+ rc = osd_convert_root_to_new_seq(env, o);
+ if (rc)
+ GOTO(err, rc);
+
/* Use our own ZAP for inode accounting by default, this can be changed
* via procfs to estimate the inode usage from the block usage */
o->od_quota_iused_est = 0;
o->arc_prune_cb = arc_add_prune_callback(arc_prune_func, o);
+ /* initialize quota slave instance */
+ o->od_quota_slave = qsd_init(env, o->od_svname, &o->od_dt_dev,
+ o->od_proc_entry);
+ if (IS_ERR(o->od_quota_slave)) {
+ rc = PTR_ERR(o->od_quota_slave);
+ o->od_quota_slave = NULL;
+ GOTO(err, rc);
+ }
+
+ /* parse mount option "noacl", and enable ACL by default */
+ opts = lustre_cfg_string(cfg, 3);
+ if (opts == NULL || strstr(opts, "noacl") == NULL)
+ o->od_posix_acl = 1;
+
err:
RETURN(rc);
}
{
ENTRY;
- if (cfs_atomic_read(&o->od_zerocopy_alloc))
+ if (atomic_read(&o->od_zerocopy_alloc))
CERROR("%s: lost %d allocated page(s)\n", o->od_svname,
- cfs_atomic_read(&o->od_zerocopy_alloc));
- if (cfs_atomic_read(&o->od_zerocopy_loan))
+ atomic_read(&o->od_zerocopy_alloc));
+ if (atomic_read(&o->od_zerocopy_loan))
CERROR("%s: lost %d loaned abuf(s)\n", o->od_svname,
- cfs_atomic_read(&o->od_zerocopy_loan));
- if (cfs_atomic_read(&o->od_zerocopy_pin))
+ atomic_read(&o->od_zerocopy_loan));
+ if (atomic_read(&o->od_zerocopy_pin))
CERROR("%s: lost %d pinned dbuf(s)\n", o->od_svname,
- cfs_atomic_read(&o->od_zerocopy_pin));
+ atomic_read(&o->od_zerocopy_pin));
if (o->od_objset.os != NULL)
udmu_objset_close(&o->od_objset);
struct lu_device *d)
{
struct osd_device *o = osd_dev(d);
+ struct dsl_dataset *ds;
int rc;
ENTRY;
+ osd_shutdown(env, o);
osd_oi_fini(env, o);
if (o->od_objset.os) {
- arc_remove_prune_callback(o->arc_prune_cb);
- o->arc_prune_cb = NULL;
+ ds = dmu_objset_ds(o->od_objset.os);
+ rc = dsl_prop_unregister(ds, "xattr", osd_xattr_changed_cb, o);
+ if (rc)
+ CERROR("%s: dsl_prop_unregister xattr error %d\n",
+ o->od_svname, rc);
+ if (o->arc_prune_cb != NULL) {
+ arc_remove_prune_callback(o->arc_prune_cb);
+ o->arc_prune_cb = NULL;
+ }
osd_sync(env, lu2dt_dev(d));
txg_wait_callbacks(spa_get_dsl(dmu_objset_spa(o->od_objset.os)));
}
struct lu_device *d, struct lustre_cfg *cfg)
{
struct osd_device *o = osd_dev(d);
- int err;
+ int rc;
ENTRY;
switch(cfg->lcfg_command) {
case LCFG_SETUP:
- err = osd_mount(env, o, cfg);
+ rc = osd_mount(env, o, cfg);
break;
case LCFG_CLEANUP:
- err = osd_shutdown(env, o);
+ rc = osd_shutdown(env, o);
+ break;
+ case LCFG_PARAM: {
+ LASSERT(&o->od_dt_dev);
+ rc = class_process_proc_seq_param(PARAM_OSD,
+ lprocfs_osd_obd_vars, cfg,
+ &o->od_dt_dev);
+ if (rc > 0 || rc == -ENOSYS)
+ rc = class_process_proc_seq_param(PARAM_OST,
+ lprocfs_osd_obd_vars,
+ cfg, &o->od_dt_dev);
break;
+ }
default:
- err = -ENOTTY;
+ rc = -ENOTTY;
}
- RETURN(err);
+ RETURN(rc);
}
static int osd_recovery_complete(const struct lu_env *env, struct lu_device *d)
{
+ struct osd_device *osd = osd_dev(d);
+ int rc = 0;
ENTRY;
- RETURN(0);
+
+ if (osd->od_quota_slave == NULL)
+ RETURN(0);
+
+ /* start qsd instance on recovery completion, this notifies the quota
+ * slave code that we are about to process new requests now */
+ rc = qsd_start(env, osd->od_quota_slave);
+ RETURN(rc);
}
/*
*exp = class_conn2export(&conn);
- cfs_spin_lock(&osd->od_objset.lock);
+ spin_lock(&osd->od_objset.lock);
osd->od_connects++;
- cfs_spin_unlock(&osd->od_objset.lock);
+ spin_unlock(&osd->od_objset.lock);
RETURN(0);
}
ENTRY;
/* Only disconnect the underlying layers on the final disconnect. */
- cfs_spin_lock(&osd->od_objset.lock);
+ spin_lock(&osd->od_objset.lock);
osd->od_connects--;
if (osd->od_connects == 0)
release = 1;
- cfs_spin_unlock(&osd->od_objset.lock);
+ spin_unlock(&osd->od_objset.lock);
rc = class_disconnect(exp); /* bz 9811 */
int rc = 0;
ENTRY;
- /* initialize quota slave instance */
- osd->od_quota_slave = qsd_init(env, osd->od_svname, &osd->od_dt_dev,
- osd->od_proc_entry);
- if (IS_ERR(osd->od_quota_slave)) {
- rc = PTR_ERR(osd->od_quota_slave);
- osd->od_quota_slave = NULL;
- }
+ if (osd->od_quota_slave != NULL)
+ /* set up quota slave objects */
+ rc = qsd_prepare(env, osd->od_quota_slave);
RETURN(rc);
}
{
}
+int osd_fid_alloc(const struct lu_env *env, struct obd_export *exp,
+ struct lu_fid *fid, struct md_op_data *op_data)
+{
+ struct osd_device *osd = osd_dev(exp->exp_obd->obd_lu_dev);
+
+ return seq_client_alloc_fid(env, osd->od_cl_seq, fid);
+}
+
static struct lu_device_type_operations osd_device_type_ops = {
.ldto_init = osd_type_init,
.ldto_fini = osd_type_fini,
static struct obd_ops osd_obd_device_ops = {
.o_owner = THIS_MODULE,
.o_connect = osd_obd_connect,
- .o_disconnect = osd_obd_disconnect
+ .o_disconnect = osd_obd_disconnect,
+ .o_fid_alloc = osd_fid_alloc
};
int __init osd_init(void)
if (rc)
return rc;
- rc = class_register_type(&osd_obd_device_ops, NULL,
- lprocfs_osd_module_vars,
+ rc = class_register_type(&osd_obd_device_ops, NULL, true, NULL,
+#ifndef HAVE_ONLY_PROCFS_SEQ
+ NULL,
+#endif
LUSTRE_OSD_ZFS_NAME, &osd_device_type);
if (rc)
lu_kmem_fini(osd_caches);