* Use is subject to license terms.
*/
/*
- * Copyright (c) 2011, 2012 Whamcloud, Inc.
+ * Copyright (c) 2012, 2013, Intel Corporation.
* Use is subject to license terms.
*
*/
* Author: Johann Lombardi <johann@whamcloud.com>
*/
-#ifndef EXPORT_SYMTAB
-# define EXPORT_SYMTAB
-#endif
#define DEBUG_SUBSYSTEM S_OSD
#include <lustre_ver.h>
#include <obd_class.h>
#include <lustre_disk.h>
#include <lustre_fid.h>
+#include <md_object.h>
#include "osd_internal.h"
static char *root_tag = "osd_mount, rootdb";
/* Slab for OSD object allocation */
-cfs_mem_cache_t *osd_object_kmem;
+struct kmem_cache *osd_object_kmem;
static struct lu_kmem_descr osd_caches[] = {
{
{
struct osd_thandle *oh = cb_data;
struct thandle *th = &oh->ot_super;
+ struct osd_device *osd = osd_dt_dev(th->th_dev);
struct lu_device *lud = &th->th_dev->dd_lu_dev;
struct dt_txn_commit_cb *dcb, *tmp;
cfs_list_for_each_entry_safe(dcb, tmp, &oh->ot_dcb_list, dcb_linkage)
dcb->dcb_func(NULL, th, dcb, error);
+ /* Unlike ldiskfs, zfs updates space accounting at commit time.
+ * As a consequence, op_end is called only now to inform the quota slave
+ * component that reserved quota space is now accounted in usage and
+ * should be released. Quota space won't be adjusted at this point since
+ * we can't provide a suitable environment. It will be performed
+ * asynchronously by a lquota thread. */
+ qsd_op_end(NULL, osd->od_quota_slave, &oh->ot_quota_trans);
+
lu_device_put(lud);
th->th_dev = NULL;
lu_context_exit(&th->th_ctx);
if (!lu_device_is_md(&d->dd_lu_dev) && rc == -ENOSPC)
CERROR("%s: failed to start transaction due to ENOSPC. "
"Metadata overhead is underestimated or "
- "grant_ratio is too low.\n",
- osd->od_dt_dev.dd_lu_dev.ld_obd->obd_name);
+ "grant_ratio is too low.\n", osd->od_svname);
else
CERROR("%s: can't assign tx: rc = %d\n",
- osd->od_dt_dev.dd_lu_dev.ld_obd->obd_name, rc);
+ osd->od_svname, rc);
} else {
/* add commit callback */
dmu_tx_callback_register(oh->ot_tx, osd_trans_commit_cb, oh);
LASSERT(oh->ot_tx);
dmu_tx_abort(oh->ot_tx);
osd_object_sa_dirty_rele(oh);
+ /* there won't be any commit, release reserved quota space now,
+ * if any */
+ qsd_op_end(env, osd->od_quota_slave, &oh->ot_quota_trans);
OBD_FREE_PTR(oh);
RETURN(0);
}
+ /* When doing our own inode accounting, the ZAPs storing per-uid/gid
+ * usage are updated at operation execution time, so we should call
+ * qsd_op_end() straight away. Otherwise (for blk accounting maintained
+ * by ZFS and when #inode is estimated from #blks) accounting is updated
+ * at commit time and the call to qsd_op_end() must be delayed */
+ if (oh->ot_quota_trans.lqt_id_cnt > 0 &&
+ !oh->ot_quota_trans.lqt_ids[0].lqi_is_blk &&
+ !osd->od_quota_iused_est)
+ qsd_op_end(env, osd->od_quota_slave, &oh->ot_quota_trans);
+
rc = dt_txn_hook_stop(env, th);
if (rc != 0)
CDEBUG(D_OTHER, "%s: transaction hook failed: rc = %d\n",
oh->ot_tx = tx;
CFS_INIT_LIST_HEAD(&oh->ot_dcb_list);
CFS_INIT_LIST_HEAD(&oh->ot_sa_list);
- cfs_sema_init(&oh->ot_sa_lock, 1);
+ sema_init(&oh->ot_sa_lock, 1);
+ memset(&oh->ot_quota_trans, 0, sizeof(oh->ot_quota_trans));
th = &oh->ot_super;
th->th_dev = dt;
th->th_result = 0;
ENTRY;
rc = udmu_objset_statfs(&osd->od_objset, osfs);
- if (rc)
+ if (unlikely(rc))
RETURN(rc);
osfs->os_bavail -= min_t(obd_size,
OSD_GRANT_FOR_LOCAL_OIDS / osfs->os_bsize,
tx_state_t *tx = &dmu_objset_pool(osd->od_objset.os)->dp_tx;
uint64_t txg;
+ mutex_enter(&tx->tx_sync_lock);
txg = tx->tx_open_txg + 1;
if (tx->tx_quiesce_txg_waiting < txg) {
tx->tx_quiesce_txg_waiting = txg;
RETURN(0);
}
+static void osd_xattr_changed_cb(void *arg, uint64_t newval)
+{
+ struct osd_device *osd = arg;
+
+ osd->od_xattr_in_sa = (newval == ZFS_XATTR_SA);
+}
+
static int osd_mount(const struct lu_env *env,
struct osd_device *o, struct lustre_cfg *cfg)
{
+ struct dsl_dataset *ds;
char *dev = lustre_cfg_string(cfg, 1);
dmu_buf_t *rootdb;
int rc;
RETURN(rc);
}
+ ds = dmu_objset_ds(o->od_objset.os);
+ LASSERT(ds);
+ rc = dsl_prop_register(ds, "xattr", osd_xattr_changed_cb, o);
+ if (rc)
+ CERROR("%s: cat not register xattr callback, ignore: %d\n",
+ o->od_svname, rc);
+
rc = __osd_obj2dbuf(env, o->od_objset.os, o->od_objset.root,
&rootdb, root_tag);
if (rc) {
rc = lu_site_init(&o->od_site, osd2lu_dev(o));
if (rc)
GOTO(err, rc);
+ o->od_site.ls_bottom_dev = osd2lu_dev(o);
rc = lu_site_init_finish(&o->od_site);
if (rc)
GOTO(err, rc);
+ rc = osd_convert_root_to_new_seq(env, o);
+ if (rc)
+ GOTO(err, rc);
+
/* Use our own ZAP for inode accounting by default, this can be changed
* via procfs to estimate the inode usage from the block usage */
o->od_quota_iused_est = 0;
o->arc_prune_cb = arc_add_prune_callback(arc_prune_func, o);
+ /* initialize quota slave instance */
+ o->od_quota_slave = qsd_init(env, o->od_svname, &o->od_dt_dev,
+ o->od_proc_entry);
+ if (IS_ERR(o->od_quota_slave)) {
+ rc = PTR_ERR(o->od_quota_slave);
+ o->od_quota_slave = NULL;
+ GOTO(err, rc);
+ }
err:
RETURN(rc);
}
struct lu_device *d)
{
struct osd_device *o = osd_dev(d);
+ struct dsl_dataset *ds;
int rc;
ENTRY;
+ osd_shutdown(env, o);
osd_oi_fini(env, o);
if (o->od_objset.os) {
+ ds = dmu_objset_ds(o->od_objset.os);
+ rc = dsl_prop_unregister(ds, "xattr", osd_xattr_changed_cb, o);
+ if (rc)
+ CERROR("%s: dsl_prop_unregister xattr error %d\n",
+ o->od_svname, rc);
arc_remove_prune_callback(o->arc_prune_cb);
o->arc_prune_cb = NULL;
osd_sync(env, lu2dt_dev(d));
static int osd_recovery_complete(const struct lu_env *env, struct lu_device *d)
{
+ struct osd_device *osd = osd_dev(d);
+ int rc = 0;
ENTRY;
- RETURN(0);
+
+ if (osd->od_quota_slave == NULL)
+ RETURN(0);
+
+ /* start qsd instance on recovery completion, this notifies the quota
+ * slave code that we are about to process new requests now */
+ rc = qsd_start(env, osd->od_quota_slave);
+ RETURN(rc);
}
/*
*exp = class_conn2export(&conn);
- cfs_spin_lock(&osd->od_objset.lock);
+ spin_lock(&osd->od_objset.lock);
osd->od_connects++;
- cfs_spin_unlock(&osd->od_objset.lock);
+ spin_unlock(&osd->od_objset.lock);
RETURN(0);
}
ENTRY;
/* Only disconnect the underlying layers on the final disconnect. */
- cfs_spin_lock(&osd->od_objset.lock);
+ spin_lock(&osd->od_objset.lock);
osd->od_connects--;
if (osd->od_connects == 0)
release = 1;
- cfs_spin_unlock(&osd->od_objset.lock);
+ spin_unlock(&osd->od_objset.lock);
rc = class_disconnect(exp); /* bz 9811 */
int rc = 0;
ENTRY;
- /* initialize quota slave instance */
- osd->od_quota_slave = qsd_init(env, osd->od_svname, &osd->od_dt_dev,
- osd->od_proc_entry);
- if (IS_ERR(osd->od_quota_slave)) {
- rc = PTR_ERR(osd->od_quota_slave);
- osd->od_quota_slave = NULL;
- }
+ if (osd->od_quota_slave != NULL)
+ /* set up quota slave objects */
+ rc = qsd_prepare(env, osd->od_quota_slave);
RETURN(rc);
}