AC_MSG_RESULT([ACL size $acl_size])
AC_DEFINE_UNQUOTED(XATTR_ACL_SIZE, AS_TR_SH([$acl_size]), [size of xattr acl])
],[
- AC_ERROR([ACL size can't computed])
+ AC_ERROR([ACL size can't be computed])
+],[
+ AC_MSG_RESULT([can't check ACL size, make it 260])
+ AC_DEFINE_UNQUOTED(XATTR_ACL_SIZE,260)
])
CFLAGS="$tmp_flags"
])
RETURN(rc);
}
-static dt_obj_version_t cml_version_get(const struct lu_env *env,
- struct md_object *mo)
-{
- return mo_version_get(env, md_object_next(mo));
-}
-
-static void cml_version_set(const struct lu_env *env, struct md_object *mo,
- dt_obj_version_t version)
-{
- return mo_version_set(env, md_object_next(mo), version);
-}
-
static const struct md_object_operations cml_mo_ops = {
.moo_permission = cml_permission,
.moo_attr_get = cml_attr_get,
.moo_changelog = cml_changelog,
.moo_capa_get = cml_capa_get,
.moo_object_sync = cml_object_sync,
- .moo_version_get = cml_version_get,
- .moo_version_set = cml_version_set,
.moo_path = cml_path,
.moo_file_lock = cml_file_lock,
.moo_file_unlock = cml_file_unlock,
return -EREMOTE;
}
-/**
- * cmr moo_version_get().
- */
-static dt_obj_version_t cmr_version_get(const struct lu_env *env,
- struct md_object *mo)
-{
- /** Don't check remote object version */
- return 0;
-}
-
-
-/**
- * cmr moo_version_set().
- * No need to update remote object version here, it is done as a part
- * of reintegration of partial operation on the remote server.
- */
-static void cmr_version_set(const struct lu_env *env, struct md_object *mo,
- dt_obj_version_t version)
-{
- return;
-}
-
/** Set of md_object_operations for cmr. */
static const struct md_object_operations cmr_mo_ops = {
.moo_permission = cmr_permission,
.moo_changelog = cmr_changelog,
.moo_capa_get = cmr_capa_get,
.moo_object_sync = cmr_object_sync,
- .moo_version_get = cmr_version_get,
- .moo_version_set = cmr_version_set,
.moo_path = cmr_path,
.moo_file_lock = cmr_file_lock,
.moo_file_unlock = cmr_file_unlock,
/*
* Copyright (c) 2007, 2010, Oracle and/or its affiliates. All rights reserved.
* Use is subject to license terms.
+ * Copyright (c) 2011 Whamcloud, Inc.
*/
/*
* This file is part of Lustre, http://www.lustre.org/
#ifdef __KERNEL__
struct seq_thread_info {
struct req_capsule *sti_pill;
- struct txn_param sti_txn;
struct lu_seq_range sti_space;
struct lu_buf sti_buf;
};
/*
* Copyright (c) 2007, 2010, Oracle and/or its affiliates. All rights reserved.
* Use is subject to license terms.
+ * Copyright (c) 2011 Whamcloud, Inc.
*/
/*
* This file is part of Lustre, http://www.lustre.org/
OBD_FREE_PTR(ccb);
}
+struct thandle *seq_store_trans_create(struct lu_server_seq *seq,
+ const struct lu_env *env)
+{
+ struct dt_device *dt_dev;
+
+ dt_dev = lu2dt_dev(seq->lss_obj->do_lu.lo_dev);
+ return dt_trans_create(env, dt_dev);
+}
+
+int seq_store_trans_start(struct lu_server_seq *seq, const struct lu_env *env,
+ struct thandle *th)
+{
+ struct dt_device *dt_dev;
+ ENTRY;
+
+ dt_dev = lu2dt_dev(seq->lss_obj->do_lu.lo_dev);
+
+ return dt_trans_start(env, dt_dev, th);
+}
+
int seq_update_cb_add(struct thandle *th, struct lu_server_seq *seq)
{
struct seq_update_callback *ccb;
return rc;
}
+int seq_declare_store_write(struct lu_server_seq *seq,
+ const struct lu_env *env,
+ struct thandle *th)
+{
+ struct dt_object *dt_obj = seq->lss_obj;
+ int rc;
+ ENTRY;
+
+ rc = dt_obj->do_body_ops->dbo_declare_write(env, dt_obj,
+ sizeof(struct lu_seq_range),
+ 0, th);
+ return rc;
+}
+
/* This function implies that caller takes care about locking. */
int seq_store_write(struct lu_server_seq *seq,
const struct lu_env *env,
int seq_store_update(const struct lu_env *env, struct lu_server_seq *seq,
struct lu_seq_range *out, int sync)
{
- struct seq_thread_info *info;
struct dt_device *dt_dev;
struct thandle *th;
int rc;
- int credits = SEQ_TXN_STORE_CREDITS;
ENTRY;
dt_dev = lu2dt_dev(seq->lss_obj->do_lu.lo_dev);
- info = lu_context_key_get(&env->le_ctx, &seq_thread_key);
- if (out != NULL)
- credits += FLD_TXN_INDEX_INSERT_CREDITS;
-
- txn_param_init(&info->sti_txn, credits);
- th = dt_trans_start(env, dt_dev, &info->sti_txn);
+ th = seq_store_trans_create(seq, env);
if (IS_ERR(th))
RETURN(PTR_ERR(th));
+ rc = seq_declare_store_write(seq, env, th);
+ if (rc)
+ GOTO(exit, rc);
+
+ if (out != NULL) {
+ rc = fld_declare_server_create(seq->lss_site->ms_server_fld,
+ env, th);
+ if (rc)
+ GOTO(exit, rc);
+ }
+
+ rc = seq_store_trans_start(seq, env, th);
+ if (rc)
+ GOTO(exit, rc);
+
rc = seq_store_write(seq, env, th);
if (rc) {
CERROR("%s: Can't write space data, rc %d\n",
seq->lss_name, rc);
- GOTO(out,rc);
+ GOTO(exit,rc);
} else if (out != NULL) {
rc = fld_server_create(seq->lss_site->ms_server_fld,
env, out, th);
if (rc) {
CERROR("%s: Can't Update fld database, rc %d\n",
seq->lss_name, rc);
- GOTO(out,rc);
+ GOTO(exit,rc);
}
}
sync = !!seq_update_cb_add(th, seq);
th->th_sync |= sync;
-out:
+exit:
dt_trans_stop(env, dt_dev, th);
return rc;
}
/*
* Copyright (c) 2007, 2010, Oracle and/or its affiliates. All rights reserved.
* Use is subject to license terms.
+ * Copyright (c) 2011 Whamcloud, Inc.
*/
/*
* This file is part of Lustre, http://www.lustre.org/
}
}
+int fld_declare_server_create(struct lu_server_fld *fld,
+ const struct lu_env *env,
+ struct thandle *th)
+{
+ struct dt_object *dt_obj = fld->lsf_obj;
+ int rc;
+
+ ENTRY;
+
+ /* for ldiskfs OSD it's enough to declare operation with any ops
+ * with DMU we'll probably need to specify exact key/value */
+ rc = dt_obj->do_index_ops->dio_declare_delete(env, dt_obj, NULL, th);
+ if (rc)
+ GOTO(out, rc);
+ rc = dt_obj->do_index_ops->dio_declare_delete(env, dt_obj, NULL, th);
+ if (rc)
+ GOTO(out, rc);
+ rc = dt_obj->do_index_ops->dio_declare_insert(env, dt_obj,
+ NULL, NULL, th);
+out:
+ RETURN(rc);
+}
+EXPORT_SYMBOL(fld_declare_server_create);
+
/**
* Insert FLD index entry and update FLD cache.
*
RETURN(rc);
}
-
EXPORT_SYMBOL(fld_server_create);
/**
/*
* Copyright (c) 2007, 2010, Oracle and/or its affiliates. All rights reserved.
* Use is subject to license terms.
+ * Copyright (c) 2011 Whamcloud, Inc.
*/
/*
* This file is part of Lustre, http://www.lustre.org/
RETURN((void *)rec);
}
-struct thandle* fld_trans_start(struct lu_server_fld *fld,
- const struct lu_env *env, int credit)
+struct thandle *fld_trans_create(struct lu_server_fld *fld,
+ const struct lu_env *env)
{
- struct fld_thread_info *info;
struct dt_device *dt_dev;
- struct txn_param *p;
dt_dev = lu2dt_dev(fld->lsf_obj->do_lu.lo_dev);
- info = lu_context_key_get(&env->le_ctx, &fld_thread_key);
- p = &info->fti_txn_param;
- txn_param_init(p, credit);
- return dt_dev->dd_ops->dt_trans_start(env, dt_dev, p);
+ return dt_dev->dd_ops->dt_trans_create(env, dt_dev);
+}
+
+int fld_trans_start(struct lu_server_fld *fld,
+ const struct lu_env *env, struct thandle *th)
+{
+ struct dt_device *dt_dev;
+
+ dt_dev = lu2dt_dev(fld->lsf_obj->do_lu.lo_dev);
+
+ return dt_dev->dd_ops->dt_trans_start(env, dt_dev, th);
}
void fld_trans_stop(struct lu_server_fld *fld,
dt_dev->dd_ops->dt_trans_stop(env, th);
}
+int fld_declare_index_create(struct lu_server_fld *fld,
+ const struct lu_env *env,
+ const struct lu_seq_range *range,
+ struct thandle *th)
+{
+ struct dt_object *dt_obj = fld->lsf_obj;
+ seqno_t start;
+ int rc;
+
+ ENTRY;
+
+ start = range->lsr_start;
+ LASSERT(range_is_sane(range));
+
+ rc = dt_obj->do_index_ops->dio_declare_insert(env, dt_obj,
+ fld_rec(env, range),
+ fld_key(env, start), th);
+ RETURN(rc);
+}
+
/**
* insert range in fld store.
*
ENTRY;
- rc = dt_obj->do_index_ops->dio_delete(env, dt_obj,
- fld_key(env, seq), th,
- BYPASS_CAPA);
+ rc = dt_obj->do_index_ops->dio_delete(env, dt_obj, fld_key(env, seq),
+ th, BYPASS_CAPA);
CDEBUG(D_INFO, "%s: delete given range : "DRANGE" rc = %d\n",
fld->lsf_name, PRANGE(range), rc);
{
struct thandle *th;
int rc;
-
ENTRY;
- th = fld_trans_start(fld, env, FLD_TXN_INDEX_INSERT_CREDITS);
+
+ /* FLD_TXN_INDEX_INSERT_CREDITS */
+ th = fld_trans_create(fld, env);
if (IS_ERR(th))
RETURN(PTR_ERR(th));
+ rc = fld_declare_index_create(fld, env, &IGIF_FLD_RANGE, th);
+ if (rc) {
+ fld_trans_stop(fld, env, th);
+ RETURN(rc);
+ }
+ rc = fld_trans_start(fld, env, th);
+ if (rc) {
+ fld_trans_stop(fld, env, th);
+ RETURN(rc);
+ }
rc = fld_index_create(fld, env, &IGIF_FLD_RANGE, th);
fld_trans_stop(fld, env, th);
/*
* Copyright (c) 2007, 2010, Oracle and/or its affiliates. All rights reserved.
* Use is subject to license terms.
+ * Copyright (c) 2011 Whamcloud, Inc.
*/
/*
* This file is part of Lustre, http://www.lustre.org/
struct lu_seq_range fti_rec;
struct lu_seq_range fti_lrange;
struct lu_seq_range fti_irange;
- struct txn_param fti_txn_param;
};
-struct thandle* fld_trans_start(struct lu_server_fld *fld,
- const struct lu_env *env, int credit);
+struct thandle *fld_trans_create(struct lu_server_fld *fld,
+ const struct lu_env *env);
+int fld_trans_start(struct lu_server_fld *fld,
+ const struct lu_env *env, struct thandle *th);
void fld_trans_stop(struct lu_server_fld *fld,
const struct lu_env *env, struct thandle* th);
struct lustre_cfg;
struct thandle;
-struct txn_param;
struct dt_device;
struct dt_object;
struct dt_index_features;
struct dt_quota_ctxt;
+struct niobuf_local;
+struct niobuf_remote;
typedef enum {
MNTOPT_USERXATTR = 0x00000001,
unsigned ddp_block_shift;
mntopt_t ddp_mntopts;
unsigned ddp_max_ea_size;
+ void *ddp_mnt; /* XXX: old code can retrieve mnt -bzzz */
+ int ddp_mount_type;
+ unsigned long long ddp_maxbytes;
+ /* percentage of available space to reserve for grant error margin */
+ int ddp_grant_reserved;
+ /* per-inode space consumption */
+ short ddp_inodespace;
+ /* per-fragment grant overhead to be used by client for grant
+ * calculation */
+ int ddp_grant_frag;
};
/**
};
/**
- * Basic transaction credit op
- */
-enum dt_txn_op {
- DTO_INDEX_INSERT,
- DTO_INDEX_DELETE,
- DTO_IDNEX_UPDATE,
- DTO_OBJECT_CREATE,
- DTO_OBJECT_DELETE,
- DTO_ATTR_SET_BASE,
- DTO_XATTR_SET,
- DTO_LOG_REC, /**< XXX temporary: dt layer knows nothing about llog. */
- DTO_WRITE_BASE,
- DTO_WRITE_BLOCK,
- DTO_ATTR_SET_CHOWN,
-
- DTO_NR
-};
-
-/**
* Operations on dt device.
*/
struct dt_device_operations {
* Return device-wide statistics.
*/
int (*dt_statfs)(const struct lu_env *env,
- struct dt_device *dev, cfs_kstatfs_t *sfs);
+ struct dt_device *dev, cfs_kstatfs_t *osfs);
+ /**
+ * Create transaction, described by \a param.
+ */
+ struct thandle *(*dt_trans_create)(const struct lu_env *env,
+ struct dt_device *dev);
/**
* Start transaction, described by \a param.
*/
- struct thandle *(*dt_trans_start)(const struct lu_env *env,
- struct dt_device *dev,
- struct txn_param *param);
+ int (*dt_trans_start)(const struct lu_env *env,
+ struct dt_device *dev, struct thandle *th);
/**
* Finish previously started transaction.
*/
- void (*dt_trans_stop)(const struct lu_env *env,
+ int (*dt_trans_stop)(const struct lu_env *env,
struct thandle *th);
/**
* Add commit callback to the transaction.
void (*dt_init_quota_ctxt)(const struct lu_env *env,
struct dt_device *dev,
struct dt_quota_ctxt *ctxt, void *data);
-
- /**
- * get transaction credits for given \a op.
- */
- int (*dt_credit_get)(const struct lu_env *env, struct dt_device *dev,
- enum dt_txn_op);
};
struct dt_index_features {
enum dt_format_type dt_mode_to_dft(__u32 mode);
-/** Version type. May differ in DMU and ldiskfs */
typedef __u64 dt_obj_version_t;
/**
*
* precondition: dt_object_exists(dt);
*/
+ int (*do_declare_attr_set)(const struct lu_env *env,
+ struct dt_object *dt,
+ const struct lu_attr *attr,
+ struct thandle *handle);
int (*do_attr_set)(const struct lu_env *env,
struct dt_object *dt,
const struct lu_attr *attr,
*
* precondition: dt_object_exists(dt);
*/
+ int (*do_declare_xattr_set)(const struct lu_env *env,
+ struct dt_object *dt,
+ const struct lu_buf *buf,
+ const char *name, int fl,
+ struct thandle *handle);
int (*do_xattr_set)(const struct lu_env *env,
struct dt_object *dt, const struct lu_buf *buf,
const char *name, int fl, struct thandle *handle,
*
* precondition: dt_object_exists(dt);
*/
+ int (*do_declare_xattr_del)(const struct lu_env *env,
+ struct dt_object *dt,
+ const char *name, struct thandle *handle);
int (*do_xattr_del)(const struct lu_env *env,
struct dt_object *dt,
const char *name, struct thandle *handle,
* precondition: !dt_object_exists(dt);
* postcondition: ergo(result == 0, dt_object_exists(dt));
*/
+ int (*do_declare_create)(const struct lu_env *env,
+ struct dt_object *dt,
+ struct lu_attr *attr,
+ struct dt_allocation_hint *hint,
+ struct dt_object_format *dof,
+ struct thandle *th);
int (*do_create)(const struct lu_env *env, struct dt_object *dt,
struct lu_attr *attr,
struct dt_allocation_hint *hint,
struct thandle *th);
/**
+ Destroy object on this device
+ * precondition: !dt_object_exists(dt);
+ * postcondition: ergo(result == 0, dt_object_exists(dt));
+ */
+ int (*do_declare_destroy)(const struct lu_env *env,
+ struct dt_object *dt,
+ struct thandle *th);
+ int (*do_destroy)(const struct lu_env *env, struct dt_object *dt,
+ struct thandle *th);
+
+ /**
* Announce that this object is going to be used as an index. This
* operation check that object supports indexing operations and
* installs appropriate dt_index_operations vector on success.
* Add nlink of the object
* precondition: dt_object_exists(dt);
*/
- void (*do_ref_add)(const struct lu_env *env,
+ int (*do_declare_ref_add)(const struct lu_env *env,
+ struct dt_object *dt, struct thandle *th);
+ int (*do_ref_add)(const struct lu_env *env,
struct dt_object *dt, struct thandle *th);
/**
* Del nlink of the object
* precondition: dt_object_exists(dt);
*/
- void (*do_ref_del)(const struct lu_env *env,
+ int (*do_declare_ref_del)(const struct lu_env *env,
+ struct dt_object *dt, struct thandle *th);
+ int (*do_ref_del)(const struct lu_env *env,
struct dt_object *dt, struct thandle *th);
struct obd_capa *(*do_capa_get)(const struct lu_env *env,
struct lustre_capa *old,
__u64 opc);
int (*do_object_sync)(const struct lu_env *, struct dt_object *);
- dt_obj_version_t (*do_version_get)(const struct lu_env *env,
- struct dt_object *dt);
- void (*do_version_set)(const struct lu_env *env, struct dt_object *dt,
- dt_obj_version_t new_version);
/**
* Get object info of next level. Currently, only get inode from osd.
* This is only used by quota b=16542
/**
* precondition: dt_object_exists(dt);
*/
+ ssize_t (*dbo_declare_write)(const struct lu_env *env,
+ struct dt_object *dt,
+ const loff_t size, loff_t pos,
+ struct thandle *handle);
ssize_t (*dbo_write)(const struct lu_env *env, struct dt_object *dt,
const struct lu_buf *buf, loff_t *pos,
struct thandle *handle, struct lustre_capa *capa,
int ignore_quota);
+ /*
+ * methods for zero-copy IO
+ */
+
+ /*
+ * precondition: dt_object_exists(dt);
+ * returns:
+ * < 0 - error code
+ * = 0 - illegal
+ * > 0 - number of local buffers prepared
+ */
+ int (*dbo_bufs_get)(const struct lu_env *env, struct dt_object *dt,
+ loff_t pos, ssize_t len, struct niobuf_local *lb,
+ int rw, struct lustre_capa *capa);
+ /*
+ * precondition: dt_object_exists(dt);
+ */
+ int (*dbo_bufs_put)(const struct lu_env *env, struct dt_object *dt,
+ struct niobuf_local *lb, int nr);
+ /*
+ * precondition: dt_object_exists(dt);
+ */
+ int (*dbo_write_prep)(const struct lu_env *env, struct dt_object *dt,
+ struct niobuf_local *lb, int nr);
+ /*
+ * precondition: dt_object_exists(dt);
+ */
+ int (*dbo_declare_write_commit)(const struct lu_env *env,
+ struct dt_object *dt,
+ struct niobuf_local *,
+ int, struct thandle *);
+ /*
+ * precondition: dt_object_exists(dt);
+ */
+ int (*dbo_write_commit)(const struct lu_env *env, struct dt_object *dt,
+ struct niobuf_local *, int, struct thandle *);
+ /*
+ * precondition: dt_object_exists(dt);
+ */
+ int (*dbo_read_prep)(const struct lu_env *env, struct dt_object *dt,
+ struct niobuf_local *lnb, int nr);
+ int (*dbo_fiemap_get)(const struct lu_env *env, struct dt_object *dt,
+ struct ll_user_fiemap *fm);
+ /**
+ * Punch object's content
+ * precondition: regular object, not index
+ */
+ int (*do_declare_punch)(const struct lu_env *, struct dt_object *,
+ __u64, __u64, struct thandle *th);
+ int (*do_punch)(const struct lu_env *env, struct dt_object *dt,
+ __u64 start, __u64 end, struct thandle *th,
+ struct lustre_capa *capa);
};
/**
/**
* precondition: dt_object_exists(dt);
*/
+ int (*dio_declare_insert)(const struct lu_env *env,
+ struct dt_object *dt,
+ const struct dt_rec *rec,
+ const struct dt_key *key,
+ struct thandle *handle);
int (*dio_insert)(const struct lu_env *env, struct dt_object *dt,
const struct dt_rec *rec, const struct dt_key *key,
struct thandle *handle, struct lustre_capa *capa,
/**
* precondition: dt_object_exists(dt);
*/
+ int (*dio_declare_delete)(const struct lu_env *env,
+ struct dt_object *dt,
+ const struct dt_key *key,
+ struct thandle *handle);
int (*dio_delete)(const struct lu_env *env, struct dt_object *dt,
const struct dt_key *key, struct thandle *handle,
struct lustre_capa *capa);
const struct dt_it *di);
int (*rec)(const struct lu_env *env,
const struct dt_it *di,
- struct lu_dirent *lde,
+ struct dt_rec *rec,
__u32 attr);
__u64 (*store)(const struct lu_env *env,
const struct dt_it *di);
int (*load)(const struct lu_env *env,
const struct dt_it *di, __u64 hash);
+ int (*key_rec)(const struct lu_env *env,
+ const struct dt_it *di, void* key_rec);
} dio_it;
};
const struct dt_index_operations *do_index_ops;
};
+static inline struct dt_object *lu2dt(struct lu_object *l)
+{
+ LASSERT(l == NULL || IS_ERR(l) || lu_device_is_dt(l->lo_dev));
+ return container_of0(l, struct dt_object, do_lu);
+}
+
int dt_object_init(struct dt_object *obj,
struct lu_object_header *h, struct lu_device *d);
return lu_object_exists(&dt->do_lu);
}
-struct txn_param {
- /** number of blocks this transaction will modify */
- unsigned int tp_credits;
-};
-
-static inline void txn_param_init(struct txn_param *p, unsigned int credits)
-{
- memset(p, 0, sizeof(*p));
- p->tp_credits = credits;
-}
-
-static inline void txn_param_credit_add(struct txn_param *p,
- unsigned int credits)
-{
- p->tp_credits += credits;
-}
-
/**
* This is the general purpose transaction handle.
* 1. Transaction Life Cycle
/** the dt device on which the transactions are executed */
struct dt_device *th_dev;
+ /** additional tags (layers can add in declare) */
+ __u32 th_tags;
+
/** context for this transaction, tag is LCT_TX_HANDLE */
struct lu_context th_ctx;
/** the last operation result in this transaction.
* this value is used in recovery */
__s32 th_result;
+
/** whether we need sync commit */
- int th_sync;
+ int th_sync:1;
+
+ /* local transation, no need to inform other layers */
+ int th_local:1;
};
/**
*/
struct dt_txn_callback {
int (*dtc_txn_start)(const struct lu_env *env,
- struct txn_param *param, void *cookie);
+ struct thandle *txn, void *cookie);
int (*dtc_txn_stop)(const struct lu_env *env,
struct thandle *txn, void *cookie);
void (*dtc_txn_commit)(struct thandle *txn, void *cookie);
void dt_txn_callback_del(struct dt_device *dev, struct dt_txn_callback *cb);
int dt_txn_hook_start(const struct lu_env *env,
- struct dt_device *dev, struct txn_param *param);
+ struct dt_device *dev, struct thandle *txn);
int dt_txn_hook_stop(const struct lu_env *env, struct thandle *txn);
void dt_txn_hook_commit(struct thandle *txn);
struct dt_device *dev,
const struct lu_fid *fid);
-static inline dt_obj_version_t do_version_get(const struct lu_env *env,
- struct dt_object *o)
-{
- LASSERT(o->do_ops->do_version_get);
- return o->do_ops->do_version_get(env, o);
-}
+int dt_declare_version_set(const struct lu_env *env, struct dt_object *o,
+ struct thandle *th);
+void dt_version_set(const struct lu_env *env, struct dt_object *o,
+ dt_obj_version_t version, struct thandle *th);
+dt_obj_version_t dt_version_get(const struct lu_env *env, struct dt_object *o);
-static inline void do_version_set(const struct lu_env *env,
- struct dt_object *o, dt_obj_version_t v)
-{
- LASSERT(o->do_ops->do_version_set);
- return o->do_ops->do_version_set(env, o, v);
-}
int dt_record_read(const struct lu_env *env, struct dt_object *dt,
struct lu_buf *buf, loff_t *pos);
const struct lu_buf *buf, loff_t *pos, struct thandle *th);
-static inline struct thandle *dt_trans_start(const struct lu_env *env,
- struct dt_device *d,
- struct txn_param *p)
+static inline struct thandle *dt_trans_create(const struct lu_env *env,
+ struct dt_device *d)
{
- LASSERT(d->dd_ops->dt_trans_start);
- return d->dd_ops->dt_trans_start(env, d, p);
+ LASSERT(d->dd_ops->dt_trans_create);
+ return d->dd_ops->dt_trans_create(env, d);
}
-static inline void dt_trans_stop(const struct lu_env *env,
+static inline int dt_trans_start(const struct lu_env *env,
struct dt_device *d, struct thandle *th)
{
+ LASSERT(d->dd_ops->dt_trans_start);
+ return d->dd_ops->dt_trans_start(env, d, th);
+}
+
+/* for this transaction hooks shouldn't be called */
+static inline int dt_trans_start_local(const struct lu_env *env,
+ struct dt_device *d, struct thandle *th)
+{
+ LASSERT(d->dd_ops->dt_trans_start);
+ th->th_local = 1;
+ return d->dd_ops->dt_trans_start(env, d, th);
+}
+
+static inline int dt_trans_stop(const struct lu_env *env,
+ struct dt_device *d, struct thandle *th)
+{
LASSERT(d->dd_ops->dt_trans_stop);
return d->dd_ops->dt_trans_stop(env, th);
}
return th->th_dev->dd_ops->dt_trans_cb_add(th, dcb);
}
/** @} dt */
+
+
+static inline int dt_declare_record_write(const struct lu_env *env,
+ struct dt_object *dt,
+ int size, loff_t pos,
+ struct thandle *th)
+{
+ int rc;
+
+ LASSERTF(dt != NULL, "dt is NULL when we want to write record\n");
+ LASSERT(th != NULL);
+ rc = dt->do_body_ops->dbo_declare_write(env, dt, size, pos, th);
+ return rc;
+}
+
+static inline int dt_declare_create(const struct lu_env *env,
+ struct dt_object *dt,
+ struct lu_attr *attr,
+ struct dt_allocation_hint *hint,
+ struct dt_object_format *dof,
+ struct thandle *th)
+{
+ LASSERT(dt);
+ LASSERT(dt->do_ops);
+ LASSERT(dt->do_ops->do_declare_create);
+ return dt->do_ops->do_declare_create(env, dt, attr, hint, dof, th);
+}
+
+static inline int dt_create(const struct lu_env *env,
+ struct dt_object *dt,
+ struct lu_attr *attr,
+ struct dt_allocation_hint *hint,
+ struct dt_object_format *dof,
+ struct thandle *th)
+{
+ LASSERT(dt);
+ LASSERT(dt->do_ops);
+ LASSERT(dt->do_ops->do_create);
+ return dt->do_ops->do_create(env, dt, attr, hint, dof, th);
+}
+
+static inline int dt_declare_destroy(const struct lu_env *env,
+ struct dt_object *dt,
+ struct thandle *th)
+{
+ LASSERT(dt);
+ LASSERT(dt->do_ops);
+ LASSERT(dt->do_ops->do_declare_destroy);
+ return dt->do_ops->do_declare_destroy(env, dt, th);
+}
+
+static inline int dt_destroy(const struct lu_env *env,
+ struct dt_object *dt,
+ struct thandle *th)
+{
+ LASSERT(dt);
+ LASSERT(dt->do_ops);
+ LASSERT(dt->do_ops->do_destroy);
+ return dt->do_ops->do_destroy(env, dt, th);
+}
+
+static inline void dt_read_lock(const struct lu_env *env,
+ struct dt_object *dt,
+ unsigned role)
+{
+ LASSERT(dt);
+ LASSERT(dt->do_ops);
+ LASSERT(dt->do_ops->do_read_lock);
+ dt->do_ops->do_read_lock(env, dt, role);
+}
+
+static inline void dt_write_lock(const struct lu_env *env,
+ struct dt_object *dt,
+ unsigned role)
+{
+ LASSERT(dt);
+ LASSERT(dt->do_ops);
+ LASSERT(dt->do_ops->do_write_lock);
+ dt->do_ops->do_write_lock(env, dt, role);
+}
+
+static inline void dt_read_unlock(const struct lu_env *env,
+ struct dt_object *dt)
+{
+ LASSERT(dt);
+ LASSERT(dt->do_ops);
+ LASSERT(dt->do_ops->do_read_unlock);
+ dt->do_ops->do_read_unlock(env, dt);
+}
+
+static inline void dt_write_unlock(const struct lu_env *env,
+ struct dt_object *dt)
+{
+ LASSERT(dt);
+ LASSERT(dt->do_ops);
+ LASSERT(dt->do_ops->do_write_unlock);
+ dt->do_ops->do_write_unlock(env, dt);
+}
+
+static inline int dt_write_locked(const struct lu_env *env,
+ struct dt_object *dt)
+{
+ LASSERT(dt);
+ LASSERT(dt->do_ops);
+ LASSERT(dt->do_ops->do_write_locked);
+ return dt->do_ops->do_write_locked(env, dt);
+}
+
+static inline int dt_attr_get(const struct lu_env *env, struct dt_object *dt,
+ struct lu_attr *la, void *arg)
+{
+ LASSERT(dt);
+ LASSERT(dt->do_ops);
+ LASSERT(dt->do_ops->do_attr_get);
+ return dt->do_ops->do_attr_get(env, dt, la, arg);
+}
+
+static inline int dt_declare_attr_set(const struct lu_env *env,
+ struct dt_object *dt,
+ const struct lu_attr *la,
+ struct thandle *th)
+{
+ LASSERT(dt);
+ LASSERT(dt->do_ops);
+ LASSERT(dt->do_ops->do_declare_attr_set);
+ return dt->do_ops->do_declare_attr_set(env, dt, la, th);
+}
+
+static inline int dt_attr_set(const struct lu_env *env, struct dt_object *dt,
+ const struct lu_attr *la, struct thandle *th,
+ struct lustre_capa *capa)
+{
+ LASSERT(dt);
+ LASSERT(dt->do_ops);
+ LASSERT(dt->do_ops->do_attr_set);
+ return dt->do_ops->do_attr_set(env, dt, la, th, capa);
+}
+
+static inline int dt_declare_ref_add(const struct lu_env *env,
+ struct dt_object *dt, struct thandle *th)
+{
+ LASSERT(dt);
+ LASSERT(dt->do_ops);
+ LASSERT(dt->do_ops->do_declare_ref_add);
+ return dt->do_ops->do_declare_ref_add(env, dt, th);
+}
+
+static inline int dt_ref_add(const struct lu_env *env,
+ struct dt_object *dt, struct thandle *th)
+{
+ LASSERT(dt);
+ LASSERT(dt->do_ops);
+ LASSERT(dt->do_ops->do_ref_add);
+ return dt->do_ops->do_ref_add(env, dt, th);
+}
+
+static inline int dt_declare_ref_del(const struct lu_env *env,
+ struct dt_object *dt, struct thandle *th)
+{
+ LASSERT(dt);
+ LASSERT(dt->do_ops);
+ LASSERT(dt->do_ops->do_declare_ref_del);
+ return dt->do_ops->do_declare_ref_del(env, dt, th);
+}
+
+static inline int dt_ref_del(const struct lu_env *env,
+ struct dt_object *dt, struct thandle *th)
+{
+ LASSERT(dt);
+ LASSERT(dt->do_ops);
+ LASSERT(dt->do_ops->do_ref_del);
+ return dt->do_ops->do_ref_del(env, dt, th);
+}
+
+static inline struct obd_capa *dt_capa_get(const struct lu_env *env,
+ struct dt_object *dt,
+ struct lustre_capa *old, __u64 opc)
+{
+ LASSERT(dt);
+ LASSERT(dt->do_ops);
+ LASSERT(dt->do_ops->do_ref_del);
+ return dt->do_ops->do_capa_get(env, dt, old, opc);
+}
+
+static inline int dt_bufs_get(const struct lu_env *env, struct dt_object *d,
+ struct niobuf_remote *rnb,
+ struct niobuf_local *lnb, int rw,
+ struct lustre_capa *capa)
+{
+ LASSERT(d);
+ LASSERT(d->do_body_ops);
+ LASSERT(d->do_body_ops->dbo_bufs_get);
+ return d->do_body_ops->dbo_bufs_get(env, d, rnb->offset,
+ rnb->len, lnb, rw, capa);
+}
+
+static inline int dt_bufs_put(const struct lu_env *env, struct dt_object *d,
+ struct niobuf_local *lnb, int n)
+{
+ LASSERT(d);
+ LASSERT(d->do_body_ops);
+ LASSERT(d->do_body_ops->dbo_bufs_put);
+ return d->do_body_ops->dbo_bufs_put(env, d, lnb, n);
+}
+
+static inline int dt_write_prep(const struct lu_env *env, struct dt_object *d,
+ struct niobuf_local *lnb, int n)
+{
+ LASSERT(d);
+ LASSERT(d->do_body_ops);
+ LASSERT(d->do_body_ops->dbo_write_prep);
+ return d->do_body_ops->dbo_write_prep(env, d, lnb, n);
+}
+
+static inline int dt_declare_write_commit(const struct lu_env *env,
+ struct dt_object *d,
+ struct niobuf_local *lnb,
+ int n, struct thandle *th)
+{
+ LASSERTF(d != NULL, "dt is NULL when we want to declare write\n");
+ LASSERT(th != NULL);
+ return d->do_body_ops->dbo_declare_write_commit(env, d, lnb, n, th);
+}
+
+
+static inline int dt_write_commit(const struct lu_env *env,
+ struct dt_object *d, struct niobuf_local *lnb,
+ int n, struct thandle *th)
+{
+ LASSERT(d);
+ LASSERT(d->do_body_ops);
+ LASSERT(d->do_body_ops->dbo_write_commit);
+ return d->do_body_ops->dbo_write_commit(env, d, lnb, n, th);
+}
+
+static inline int dt_read_prep(const struct lu_env *env, struct dt_object *d,
+ struct niobuf_local *lnb, int n)
+{
+ LASSERT(d);
+ LASSERT(d->do_body_ops);
+ LASSERT(d->do_body_ops->dbo_read_prep);
+ return d->do_body_ops->dbo_read_prep(env, d, lnb, n);
+}
+
+static inline int dt_declare_punch(const struct lu_env *env,
+ struct dt_object *dt, __u64 start,
+ __u64 end, struct thandle *th)
+{
+ LASSERT(dt);
+ LASSERT(dt->do_body_ops);
+ LASSERT(dt->do_body_ops->do_declare_punch);
+ return dt->do_body_ops->do_declare_punch(env, dt, start, end, th);
+}
+
+static inline int dt_punch(const struct lu_env *env, struct dt_object *dt,
+ __u64 start, __u64 end, struct thandle *th,
+ struct lustre_capa *capa)
+{
+ LASSERT(dt);
+ LASSERT(dt->do_body_ops);
+ LASSERT(dt->do_body_ops->do_punch);
+ return dt->do_body_ops->do_punch(env, dt, start, end, th, capa);
+}
+
+static inline int dt_fiemap_get(const struct lu_env *env, struct dt_object *d,
+ struct ll_user_fiemap *fm)
+{
+ LASSERT(d);
+ if (d->do_body_ops == NULL)
+ return -EPROTO;
+ LASSERT(d->do_body_ops->dbo_fiemap_get);
+ return d->do_body_ops->dbo_fiemap_get(env, d, fm);
+}
+
+static inline int dt_statfs(const struct lu_env *env, struct dt_device *dev,
+ cfs_kstatfs_t *osfs)
+{
+ LASSERT(dev);
+ LASSERT(dev->dd_ops);
+ LASSERT(dev->dd_ops->dt_statfs);
+ return dev->dd_ops->dt_statfs(env, dev, osfs);
+}
+
+static inline int dt_root_get(const struct lu_env *env, struct dt_device *dev,
+ struct lu_fid *f)
+{
+ LASSERT(dev);
+ LASSERT(dev->dd_ops);
+ LASSERT(dev->dd_ops->dt_root_get);
+ return dev->dd_ops->dt_root_get(env, dev, f);
+}
+
+static inline void dt_conf_get(const struct lu_env *env,
+ const struct dt_device *dev,
+ struct dt_device_param *param)
+{
+ LASSERT(dev);
+ LASSERT(dev->dd_ops);
+ LASSERT(dev->dd_ops->dt_conf_get);
+ return dev->dd_ops->dt_conf_get(env, dev, param);
+}
+
+static inline int dt_sync(const struct lu_env *env, struct dt_device *dev)
+{
+ LASSERT(dev);
+ LASSERT(dev->dd_ops);
+ LASSERT(dev->dd_ops->dt_sync);
+ return dev->dd_ops->dt_sync(env, dev);
+}
+
+static inline void dt_ro(const struct lu_env *env, struct dt_device *dev)
+{
+ LASSERT(dev);
+ LASSERT(dev->dd_ops);
+ LASSERT(dev->dd_ops->dt_ro);
+ return dev->dd_ops->dt_ro(env, dev);
+}
+
+static inline int dt_declare_insert(const struct lu_env *env,
+ struct dt_object *dt,
+ const struct dt_rec *rec,
+ const struct dt_key *key,
+ struct thandle *th)
+{
+ LASSERT(dt);
+ LASSERT(dt->do_index_ops);
+ LASSERT(dt->do_index_ops->dio_declare_insert);
+ return dt->do_index_ops->dio_declare_insert(env, dt, rec, key, th);
+}
+
+static inline int dt_insert(const struct lu_env *env,
+ struct dt_object *dt,
+ const struct dt_rec *rec,
+ const struct dt_key *key,
+ struct thandle *th,
+ struct lustre_capa *capa,
+ int noquota)
+{
+ LASSERT(dt);
+ LASSERT(dt->do_index_ops);
+ LASSERT(dt->do_index_ops->dio_insert);
+ return dt->do_index_ops->dio_insert(env, dt, rec, key, th,
+ capa, noquota);
+}
+
+static inline int dt_declare_xattr_del(const struct lu_env *env,
+ struct dt_object *dt,
+ const char *name,
+ struct thandle *th)
+{
+ LASSERT(dt);
+ LASSERT(dt->do_ops);
+ LASSERT(dt->do_ops->do_declare_xattr_del);
+ return dt->do_ops->do_declare_xattr_del(env, dt, name, th);
+}
+
+static inline int dt_xattr_del(const struct lu_env *env,
+ struct dt_object *dt, const char *name,
+ struct thandle *th,
+ struct lustre_capa *capa)
+{
+ LASSERT(dt);
+ LASSERT(dt->do_ops);
+ LASSERT(dt->do_ops->do_xattr_del);
+ return dt->do_ops->do_xattr_del(env, dt, name, th, capa);
+}
+
+static inline int dt_declare_xattr_set(const struct lu_env *env,
+ struct dt_object *dt,
+ const struct lu_buf *buf,
+ const char *name, int fl,
+ struct thandle *th)
+{
+ LASSERT(dt);
+ LASSERT(dt->do_ops);
+ LASSERT(dt->do_ops->do_declare_xattr_set);
+ return dt->do_ops->do_declare_xattr_set(env, dt, buf, name, fl, th);
+}
+
+static inline int dt_xattr_set(const struct lu_env *env,
+ struct dt_object *dt, const struct lu_buf *buf,
+ const char *name, int fl, struct thandle *th,
+ struct lustre_capa *capa)
+{
+ LASSERT(dt);
+ LASSERT(dt->do_ops);
+ LASSERT(dt->do_ops->do_xattr_set);
+ return dt->do_ops->do_xattr_set(env, dt, buf, name, fl, th, capa);
+}
+
+static inline int dt_xattr_get(const struct lu_env *env,
+ struct dt_object *dt, struct lu_buf *buf,
+ const char *name, struct lustre_capa *capa)
+{
+ LASSERT(dt);
+ LASSERT(dt->do_ops);
+ LASSERT(dt->do_ops->do_xattr_get);
+ return dt->do_ops->do_xattr_get(env, dt, buf, name, capa);
+}
+
+static inline int dt_xattr_list(const struct lu_env *env,
+ struct dt_object *dt, struct lu_buf *buf,
+ struct lustre_capa *capa)
+{
+ LASSERT(dt);
+ LASSERT(dt->do_ops);
+ LASSERT(dt->do_ops->do_xattr_list);
+ return dt->do_ops->do_xattr_list(env, dt, buf, capa);
+}
+
+static inline int dt_declare_delete(const struct lu_env *env,
+ struct dt_object *dt,
+ const struct dt_key *key,
+ struct thandle *th)
+{
+ LASSERT(dt);
+ LASSERT(dt->do_index_ops);
+ LASSERT(dt->do_index_ops->dio_declare_delete);
+ return dt->do_index_ops->dio_declare_delete(env, dt, key, th);
+}
+
+static inline int dt_delete(const struct lu_env *env,
+ struct dt_object *dt,
+ const struct dt_key *key,
+ struct thandle *th,
+ struct lustre_capa *capa)
+{
+ LASSERT(dt);
+ LASSERT(dt->do_index_ops);
+ LASSERT(dt->do_index_ops->dio_delete);
+ return dt->do_index_ops->dio_delete(env, dt, key, th, capa);
+}
+
+static inline int dt_commit_async(const struct lu_env *env,
+ struct dt_device *dev)
+{
+ LASSERT(dev);
+ LASSERT(dev->dd_ops);
+ LASSERT(dev->dd_ops->dt_commit_async);
+ return dev->dd_ops->dt_commit_async(env, dev);
+}
+
+static inline int dt_lookup(const struct lu_env *env,
+ struct dt_object *dt,
+ struct dt_rec *rec,
+ const struct dt_key *key,
+ struct lustre_capa *capa)
+{
+ int ret;
+
+ LASSERT(dt);
+ LASSERT(dt->do_index_ops);
+ LASSERT(dt->do_index_ops->dio_lookup);
+
+ ret = dt->do_index_ops->dio_lookup(env, dt, rec, key, capa);
+ if (ret > 0)
+ ret = 0;
+ else if (ret == 0)
+ ret = -ENOENT;
+ return ret;
+}
#endif /* __LUSTRE_DT_OBJECT_H */
#define XATTR_NAME_LMA "trusted.lma"
#define XATTR_NAME_LMV "trusted.lmv"
#define XATTR_NAME_LINK "trusted.link"
+#define XATTR_NAME_VERSION "trusted.version"
struct lov_mds_md_v3 { /* LOV EA mds/wire data (little-endian) */
/*
* Copyright (c) 2007, 2010, Oracle and/or its affiliates. All rights reserved.
* Use is subject to license terms.
+ * Copyright (c) 2011 Whamcloud, Inc.
*/
/*
* This file is part of Lustre, http://www.lustre.org/
void fld_server_fini(struct lu_server_fld *fld,
const struct lu_env *env);
+int fld_declare_server_create(struct lu_server_fld *fld,
+ const struct lu_env *env,
+ struct thandle *th);
+
int fld_server_create(struct lu_server_fld *fld,
const struct lu_env *env,
struct lu_seq_range *add_range,
struct lustre_capa *, int renewal);
int (*moo_object_sync)(const struct lu_env *, struct md_object *);
- dt_obj_version_t (*moo_version_get)(const struct lu_env *,
- struct md_object *);
- void (*moo_version_set)(const struct lu_env *, struct md_object *,
- dt_obj_version_t);
int (*moo_path)(const struct lu_env *env, struct md_object *obj,
char *path, int pathlen, __u64 *recno, int *linkno);
int (*moo_file_lock)(const struct lu_env *env, struct md_object *obj,
return m->mo_ops->moo_object_sync(env, m);
}
-static inline dt_obj_version_t mo_version_get(const struct lu_env *env,
- struct md_object *m)
-{
- LASSERT(m->mo_ops->moo_version_get);
- return m->mo_ops->moo_version_get(env, m);
-}
-
-static inline void mo_version_set(const struct lu_env *env,
- struct md_object *m, dt_obj_version_t ver)
-{
- LASSERT(m->mo_ops->moo_version_set);
- return m->mo_ops->moo_version_set(env, m, ver);
-}
-
static inline int mo_file_lock(const struct lu_env *env, struct md_object *m,
struct lov_mds_md *lmm,
struct ldlm_extent *extent,
mdd->mdd_child = lu2dt_dev(next);
/* Prepare transactions callbacks. */
- mdd->mdd_txn_cb.dtc_txn_start = mdd_txn_start_cb;
+ mdd->mdd_txn_cb.dtc_txn_start = NULL;
mdd->mdd_txn_cb.dtc_txn_stop = mdd_txn_stop_cb;
mdd->mdd_txn_cb.dtc_txn_commit = NULL;
mdd->mdd_txn_cb.dtc_cookie = mdd;
if (m->mdd_obd_dev)
mdd_fini_obd(env, m, cfg);
orph_index_fini(env, m);
+ if (m->mdd_capa != NULL) {
+ lu_object_put(env, &m->mdd_capa->do_lu);
+ m->mdd_capa = NULL;
+ }
/* remove upcall device*/
md_upcall_fini(&m->mdd_md_dev);
EXIT;
time. In case of crash, we just restart with old log so we're
allright. */
if (endrec == cur) {
+ /* XXX: transaction is started by llog itself */
rc = mdd_changelog_write_header(mdd, CLM_PURGE);
if (rc)
goto out;
changed since the last purge) */
mdd->mdd_cl.mc_starttime = cfs_time_current_64();
+ /* XXX: transaction is started by llog itself */
rc = llog_cancel(ctxt, NULL, 1, (struct llog_cookie *)&endrec, 0);
out:
llog_ctxt_put(ctxt);
/* Status and action flags */
rec->cr.cr_markerflags = mdd->mdd_cl.mc_flags | markerflags;
+ /* XXX: transaction is started by llog itself */
rc = (mdd->mdd_cl.mc_mask & (1 << CL_MARK)) ?
mdd_changelog_llog_write(mdd, rec, NULL) : 0;
return -ENOSYS;
}
-static dt_obj_version_t dot_lustre_mdd_version_get(const struct lu_env *env,
- struct md_object *obj)
-{
- return 0;
-}
-
-static void dot_lustre_mdd_version_set(const struct lu_env *env,
- struct md_object *obj,
- dt_obj_version_t version)
-{
- return;
-}
-
static int dot_lustre_mdd_path(const struct lu_env *env, struct md_object *obj,
char *path, int pathlen, __u64 *recno, int *linkno)
{
.moo_close = dot_lustre_mdd_close,
.moo_capa_get = mdd_capa_get,
.moo_object_sync = dot_lustre_mdd_object_sync,
- .moo_version_get = dot_lustre_mdd_version_get,
- .moo_version_set = dot_lustre_mdd_version_set,
.moo_path = dot_lustre_mdd_path,
.moo_file_lock = dot_file_lock,
.moo_file_unlock = dot_file_unlock,
rc = mdd_init_obd(env, m, cfg);
if (rc) {
- CERROR("lov init error %d \n", rc);
+ CERROR("lov init error %d\n", rc);
GOTO(out, rc);
}
- rc = mdd_txn_init_credits(env, m);
- if (rc)
- break;
mdd_changelog_init(env, m);
break;
struct mdd_device *mdd = lu2mdd_dev(cdev);
struct lu_device *next = &mdd->mdd_child->dd_lu_dev;
struct dt_object *root;
+ struct lu_fid fid;
int rc;
ENTRY;
GOTO(out, rc);
}
+ /* we use capa file to declare llog changes,
+ * will be fixed with new llog in 2.3 */
+ root = dt_store_open(env, mdd->mdd_child, "", CAPA_KEYS, &fid);
+ if (!IS_ERR(root))
+ mdd->mdd_capa = root;
+ else
+ rc = PTR_ERR(root);
+
out:
RETURN(rc);
}
RETURN(rc);
}
+int mdd_declare_llog_cancel(const struct lu_env *env, struct mdd_device *mdd,
+ struct thandle *handle)
+{
+ int rc;
+
+
+ /* XXX: this is a temporary solution to declare llog changes
+ * will be fixed in 2.3 with new llog implementation */
+
+ LASSERT(mdd->mdd_capa);
+
+ /* the llog record could be canceled either by modifying
+ * the plain llog's header or by destroying the llog itself
+ * when this record is the last one in it, it can't be known
+ * here, but the catlog's header will also be modified for
+ * the second case, then the first case can be covered and
+ * is no need to declare it */
+
+ /* destroy empty plain log */
+ rc = dt_declare_destroy(env, mdd->mdd_capa, handle);
+ if (rc)
+ return rc;
+
+ /* record the catlog's header if an empty plain log was destroyed */
+ rc = dt_declare_record_write(env, mdd->mdd_capa,
+ sizeof(struct llog_logid_rec), 0, handle);
+ return rc;
+}
+
struct mdd_changelog_user_data {
__u64 mcud_endrec; /**< purge record for this user */
__u64 mcud_minrec; /**< lowest changelog recno still referenced */
/* Special case: unregister this user */
if (mcud->mcud_endrec == MCUD_UNREGISTER) {
struct llog_cookie cookie;
- void *trans_h;
+ void *th;
struct mdd_device *mdd = mcud->mcud_mdd;
cookie.lgc_lgl = llh->lgh_id;
/* XXX This is a workaround for the deadlock of changelog
* adding vs. changelog cancelling. LU-81. */
- mdd_txn_param_build(mcud->mcud_env, mdd, MDD_TXN_UNLINK_OP, 0);
- trans_h = mdd_trans_start(mcud->mcud_env, mdd);
- if (IS_ERR(trans_h)) {
- CERROR("fsfilt_start_log failed: %ld\n",
- PTR_ERR(trans_h));
- RETURN(PTR_ERR(trans_h));
+ th = mdd_trans_create(mcud->mcud_env, mdd);
+ if (IS_ERR(th)) {
+ CERROR("Cannot get thandle\n");
+ RETURN(-ENOMEM);
}
+ rc = mdd_declare_llog_cancel(mcud->mcud_env, mdd, th);
+ if (rc)
+ GOTO(stop, rc);
+
+ rc = mdd_trans_start(mcud->mcud_env, mdd, th);
+ if (rc)
+ GOTO(stop, rc);
+
rc = llog_cat_cancel_records(llh->u.phd.phd_cat_handle,
1, &cookie);
if (rc == 0)
mcud->mcud_usercount--;
- mdd_trans_stop(mcud->mcud_env, mdd, rc, trans_h);
+stop:
+ mdd_trans_stop(mcud->mcud_env, mdd, 0, th);
RETURN(rc);
}
static int __mdd_lookup(const struct lu_env *env, struct md_object *pobj,
const struct lu_name *lname, struct lu_fid* fid,
int mask);
+static int mdd_declare_links_add(const struct lu_env *env,
+ struct mdd_object *mdd_obj,
+ struct thandle *handle);
static int mdd_links_add(const struct lu_env *env,
struct mdd_object *mdd_obj,
const struct lu_fid *pfid,
RETURN(rc);
}
+int mdd_declare_llog_record(const struct lu_env *env, struct mdd_device *mdd,
+ int reclen, struct thandle *handle)
+{
+ int rc;
+
+ /* XXX: this is a temporary solution to declare llog changes
+ * will be fixed in 2.3 with new llog implementation */
+
+ LASSERT(mdd->mdd_capa);
+
+ /* record itself */
+ rc = dt_declare_record_write(env, mdd->mdd_capa, reclen, 0, handle);
+ if (rc)
+ return rc;
+
+ /* header will be updated as well */
+ rc = dt_declare_record_write(env, mdd->mdd_capa, LLOG_CHUNK_SIZE,
+ 0, handle);
+ if (rc)
+ return rc;
+
+ /* also we should be able to create new plain log */
+ rc = dt_declare_create(env, mdd->mdd_capa, NULL, NULL, NULL, handle);
+ if (rc)
+ return rc;
+
+ /* new record referencing new plain llog */
+ rc = dt_declare_record_write(env, mdd->mdd_capa,
+ sizeof(struct llog_logid_rec), 0, handle);
+ if (rc)
+ return rc;
+
+ /* catalog's header will be updated as well */
+ rc = dt_declare_record_write(env, mdd->mdd_capa, LLOG_CHUNK_SIZE,
+ 0, handle);
+
+ return rc;
+}
+
+int mdd_declare_changelog_store(const struct lu_env *env,
+ struct mdd_device *mdd,
+ const struct lu_name *fname,
+ struct thandle *handle)
+{
+ int reclen;
+
+ /* Not recording */
+ if (!(mdd->mdd_cl.mc_flags & CLM_ON))
+ return 0;
+
+ /* we'll be writing payload + llog header */
+ reclen = sizeof(struct llog_changelog_rec);
+ if (fname)
+ reclen += llog_data_len(fname->ln_namelen);
+
+ return mdd_declare_llog_record(env, mdd, reclen, handle);
+}
/** Store a namespace change changelog record
* If this fails, we must fail the whole transaction; we don't
return 0;
}
+static int mdd_declare_link(const struct lu_env *env,
+ struct mdd_device *mdd,
+ struct mdd_object *p,
+ struct mdd_object *c,
+ const struct lu_name *name,
+ struct thandle *handle)
+{
+ int rc;
+
+ rc = mdo_declare_index_insert(env, p, mdo2fid(c), name->ln_name,handle);
+ if (rc)
+ return rc;
+
+ rc = mdo_declare_ref_add(env, c, handle);
+ if (rc)
+ return rc;
+
+ rc = mdo_declare_attr_set(env, p, NULL, handle);
+ if (rc)
+ return rc;
+
+ rc = mdo_declare_attr_set(env, c, NULL, handle);
+ if (rc)
+ return rc;
+
+ rc = mdd_declare_links_add(env, c, handle);
+ if (rc)
+ return rc;
+
+ rc = mdd_declare_changelog_store(env, mdd, name, handle);
+
+ return rc;
+}
+
static int mdd_link(const struct lu_env *env, struct md_object *tgt_obj,
struct md_object *src_obj, const struct lu_name *lname,
struct md_attr *ma)
}
#endif
- mdd_txn_param_build(env, mdd, MDD_TXN_LINK_OP, 1);
- handle = mdd_trans_start(env, mdd);
+ handle = mdd_trans_create(env, mdd);
if (IS_ERR(handle))
GOTO(out_pending, rc = PTR_ERR(handle));
+ rc = mdd_declare_link(env, mdd, mdd_tobj, mdd_sobj, lname, handle);
+ if (rc)
+ GOTO(stop, rc);
+
+ rc = mdd_trans_start(env, mdd, handle);
+ if (rc)
+ GOTO(stop, rc);
+
dlh = mdd_pdo_write_lock(env, mdd_tobj, name, MOR_TGT_CHILD);
if (dlh == NULL)
GOTO(out_trans, rc = -ENOMEM);
if (rc == 0)
rc = mdd_changelog_ns_store(env, mdd, CL_HARDLINK, 0, mdd_sobj,
mdd_tobj, NULL, lname, handle);
+stop:
mdd_trans_stop(env, mdd, rc, handle);
out_pending:
#ifdef HAVE_QUOTA_SUPPORT
return rc;
}
+int mdd_declare_finish_unlink(const struct lu_env *env,
+ struct mdd_object *obj,
+ struct md_attr *ma,
+ struct thandle *handle)
+{
+ int rc;
+
+ rc = orph_declare_index_insert(env, obj, handle);
+ if (rc)
+ return rc;
+
+ return mdd_declare_object_kill(env, obj, ma, handle);
+}
+
/* caller should take a lock before calling */
int mdd_finish_unlink(const struct lu_env *env,
struct mdd_object *obj, struct md_attr *ma,
PFID(mdd_object_fid(obj)),
obj->mod_count);
} else {
- rc = mdd_object_kill(env, obj, ma);
+ rc = mdd_object_kill(env, obj, ma, th);
if (rc == 0)
reset = 0;
}
RETURN(rc);
}
+static int mdd_declare_unlink(const struct lu_env *env, struct mdd_device *mdd,
+ struct mdd_object *p, struct mdd_object *c,
+ const struct lu_name *name, struct md_attr *ma,
+ struct thandle *handle)
+{
+ int rc;
+
+ rc = mdo_declare_index_delete(env, p, name->ln_name, handle);
+ if (rc)
+ return rc;
+
+ rc = mdo_declare_ref_del(env, p, handle);
+ if (rc)
+ return rc;
+
+ rc = mdo_declare_ref_del(env, c, handle);
+ if (rc)
+ return rc;
+
+ rc = mdo_declare_ref_del(env, c, handle);
+ if (rc)
+ return rc;
+
+ rc = mdo_declare_attr_set(env, p, NULL, handle);
+ if (rc)
+ return rc;
+
+ rc = mdo_declare_attr_set(env, c, NULL, handle);
+ if (rc)
+ return rc;
+
+ rc = mdd_declare_finish_unlink(env, c, ma, handle);
+ if (rc)
+ return rc;
+
+ rc = mdd_declare_links_add(env, c, handle);
+ if (rc)
+ return rc;
+
+ rc = mdd_declare_changelog_store(env, mdd, name, handle);
+
+ return rc;
+}
+
static int mdd_unlink(const struct lu_env *env, struct md_object *pobj,
struct md_object *cobj, const struct lu_name *lname,
struct md_attr *ma)
LASSERTF(mdd_object_exists(mdd_cobj) > 0, "FID is "DFID"\n",
PFID(mdd_object_fid(mdd_cobj)));
- rc = mdd_log_txn_param_build(env, cobj, ma, MDD_TXN_UNLINK_OP, 1);
- if (rc)
- RETURN(rc);
-
- handle = mdd_trans_start(env, mdd);
+ handle = mdd_trans_create(env, mdd);
if (IS_ERR(handle))
RETURN(PTR_ERR(handle));
+ rc = mdd_declare_unlink(env, mdd, mdd_pobj, mdd_cobj,
+ lname, ma, handle);
+ if (rc)
+ GOTO(stop, rc);
+
+ rc = mdd_trans_start(env, mdd, handle);
+ if (rc)
+ GOTO(stop, rc);
+
dlh = mdd_pdo_write_lock(env, mdd_pobj, name, MOR_TGT_PARENT);
if (dlh == NULL)
GOTO(out_trans, rc = -ENOMEM);
mdd_cobj, mdd_pobj, NULL, lname, handle);
}
+stop:
mdd_trans_stop(env, mdd, rc, handle);
#ifdef HAVE_QUOTA_SUPPORT
if (quota_opc)
int rc;
ENTRY;
+ /* XXX: this code won't be used ever:
+ * DNE uses slightly different approach */
+ LBUG();
+
#ifdef HAVE_QUOTA_SUPPORT
if (mds->mds_quota) {
if (!(ma->ma_attr_flags & MDS_QUOTA_IGNORE)) {
}
}
#endif
- mdd_txn_param_build(env, mdd, MDD_TXN_INDEX_INSERT_OP, 0);
- handle = mdd_trans_start(env, mdo2mdd(pobj));
+ handle = mdd_trans_create(env, mdo2mdd(pobj));
if (IS_ERR(handle))
GOTO(out_pending, rc = PTR_ERR(handle));
+ rc = mdd_trans_start(env, mdo2mdd(pobj), handle);
+
dlh = mdd_pdo_write_lock(env, mdd_obj, name, MOR_TGT_PARENT);
if (dlh == NULL)
GOTO(out_trans, rc = -ENOMEM);
int rc;
ENTRY;
+ /* XXX: this code won't be used ever:
+ * DNE uses slightly different approach */
+ LBUG();
+
#ifdef HAVE_QUOTA_SUPPORT
if (mds->mds_quota) {
struct lu_attr *la_tmp = &mdd_env_info(env)->mti_la;
}
}
#endif
- mdd_txn_param_build(env, mdd, MDD_TXN_INDEX_DELETE_OP, 0);
- handle = mdd_trans_start(env, mdd);
+ handle = mdd_trans_create(env, mdd);
if (IS_ERR(handle))
GOTO(out_pending, rc = PTR_ERR(handle));
+ rc = mdd_trans_start(env, mdd, handle);
+
dlh = mdd_pdo_write_lock(env, mdd_obj, name, MOR_TGT_PARENT);
if (dlh == NULL)
GOTO(out_trans, rc = -ENOMEM);
int rc;
ENTRY;
+ /* XXX: this code won't be used ever:
+ * DNE uses slightly different approach */
+ LBUG();
+
#ifdef HAVE_QUOTA_SUPPORT
if (mds->mds_quota && !tobj) {
struct lu_attr *la_tmp = &mdd_env_info(env)->mti_la;
}
}
#endif
- if (tobj && mdd_object_exists(mdd_tobj))
- mdd_log_txn_param_build(env, tobj, ma, MDD_TXN_RENAME_TGT_OP,1);
- else
- mdd_txn_param_build(env, mdd, MDD_TXN_RENAME_TGT_OP, 1);
- handle = mdd_trans_start(env, mdd);
+ handle = mdd_trans_create(env, mdd);
if (IS_ERR(handle))
GOTO(out_pending, rc = PTR_ERR(handle));
+ rc = mdd_trans_start(env, mdd, handle);
+
dlh = mdd_pdo_write_lock(env, mdd_tpobj, name, MOR_TGT_PARENT);
if (dlh == NULL)
GOTO(out_trans, rc = -ENOMEM);
}
+static int mdd_declare_create_data(const struct lu_env *env,
+ struct mdd_device *mdd,
+ struct mdd_object *obj,
+ int lmm_size,
+ struct thandle *handle)
+{
+ struct lu_buf *buf = &mdd_env_info(env)->mti_buf;
+ int rc;
+
+ buf->lb_buf = NULL;
+ buf->lb_len = lmm_size;
+ rc = mdo_declare_xattr_set(env, obj, buf, XATTR_NAME_LOV,
+ 0, handle);
+ if (rc)
+ return rc;
+
+ rc = mdd_declare_lov_objid_update(env, mdd, handle);
+
+ return rc;
+}
+
static int mdd_create_data(const struct lu_env *env, struct md_object *pobj,
struct md_object *cobj, const struct md_op_spec *spec,
struct md_attr *ma)
if (rc)
RETURN(rc);
- mdd_create_txn_param_build(env, mdd, lmm, MDD_TXN_CREATE_DATA_OP, 0);
- handle = mdd_trans_start(env, mdd);
+ handle = mdd_trans_create(env, mdd);
if (IS_ERR(handle))
GOTO(out_free, rc = PTR_ERR(handle));
+ rc = mdd_declare_create_data(env, mdd, son, lmm_size, handle);
+ if (rc)
+ GOTO(stop, rc);
+
+ rc = mdd_trans_start(env, mdd, handle);
+ if (rc)
+ GOTO(stop, rc);
+
/*
* XXX: Setting the lov ea is not locked but setting the attr is locked?
* Should this be fixed?
if (rc == 0)
mdd_lov_objid_update(mdd, lmm);
+stop:
mdd_trans_stop(env, mdd, rc, handle);
out_free:
/* Finish mdd_lov_create() stuff. */
RETURN(rc);
}
+int mdd_declare_object_initialize(const struct lu_env *env,
+ struct mdd_object *child,
+ struct md_attr *ma,
+ struct thandle *handle)
+{
+ int rc;
+
+ rc = mdo_declare_attr_set(env, child, &ma->ma_attr, handle);
+ if (rc == 0 && S_ISDIR(ma->ma_attr.la_mode)) {
+ rc = mdo_declare_index_insert(env, child, mdo2fid(child),
+ dot, handle);
+ if (rc == 0)
+ rc = mdo_declare_ref_add(env, child, handle);
+ }
+ if (rc == 0)
+ mdd_declare_links_add(env, child, handle);
+
+ return rc;
+}
+
int mdd_object_initialize(const struct lu_env *env, const struct lu_fid *pfid,
const struct lu_name *lname, struct mdd_object *child,
struct md_attr *ma, struct thandle *handle,
RETURN(rc);
}
+static int mdd_declare_create(const struct lu_env *env,
+ struct mdd_device *mdd,
+ struct mdd_object *p,
+ struct mdd_object *c,
+ const struct lu_name *name,
+ struct md_attr *ma,
+ int lmm_size,
+ struct thandle *handle,
+ const struct md_op_spec *spec)
+{
+ struct lu_buf *buf = &mdd_env_info(env)->mti_buf;
+ int rc = 0;
+
+ rc = mdd_declare_object_create_internal(env, p, c, ma, handle, spec);
+ if (rc)
+ GOTO(out, rc);
+
+ /* if dir, then can inherit default ACl */
+ buf->lb_buf = NULL;
+ buf->lb_len = lmm_size;
+ if (S_ISDIR(ma->ma_attr.la_mode)) {
+ rc = mdo_declare_xattr_set(env, c, buf, XATTR_NAME_ACL_DEFAULT,
+ 0, handle);
+ if (rc == 0)
+ rc = mdo_declare_ref_add(env, p, handle);
+ }
+ if (rc)
+ GOTO(out, rc);
+
+ rc = mdo_declare_xattr_set(env, c, buf, XATTR_NAME_ACL_ACCESS,
+ 0, handle);
+ if (rc)
+ GOTO(out, rc);
+
+ rc = mdd_declare_object_initialize(env, c, ma, handle);
+ if (rc)
+ GOTO(out, rc);
+
+ rc = mdo_declare_index_insert(env, p, mdo2fid(c),
+ name->ln_name, handle);
+ if (rc)
+ GOTO(out, rc);
+
+ rc = mdo_declare_xattr_set(env, c, buf, XATTR_NAME_LOV,
+ 0, handle);
+ if (rc)
+ GOTO(out, rc);
+
+ if (S_ISLNK(ma->ma_attr.la_mode)) {
+ rc = dt_declare_record_write(env, mdd_object_child(c),
+ strlen(spec->u.sp_symname), 0,
+ handle);
+ if (rc)
+ GOTO(out, rc);
+ }
+ rc = mdo_declare_attr_set(env, p, &ma->ma_attr, handle);
+ if (rc)
+ return rc;
+
+ rc = mdd_declare_changelog_store(env, mdd, name, handle);
+ if (rc)
+ return rc;
+
+ rc = mdd_declare_lov_objid_update(env, mdd, handle);
+
+out:
+ return rc;
+}
+
+
/*
* Create object and insert it into namespace.
*/
got_def_acl = 1;
}
- mdd_create_txn_param_build(env, mdd, lmm, MDD_TXN_MKDIR_OP, 1);
- handle = mdd_trans_start(env, mdd);
+ handle = mdd_trans_create(env, mdd);
if (IS_ERR(handle))
GOTO(out_free, rc = PTR_ERR(handle));
+ rc = mdd_declare_create(env, mdd, mdd_pobj, son, lname, ma,
+ lmm_size, handle, spec);
+ if (rc)
+ GOTO(out_stop, rc);
+
+ rc = mdd_trans_start(env, mdd, handle);
+ if (rc)
+ GOTO(out_stop, rc);
+
dlh = mdd_pdo_write_lock(env, mdd_pobj, name, MOR_TGT_PARENT);
if (dlh == NULL)
GOTO(out_trans, rc = -ENOMEM);
S_ISREG(attr->la_mode) ? CL_CREATE :
S_ISLNK(attr->la_mode) ? CL_SOFTLINK : CL_MKNOD,
0, son, mdd_pobj, NULL, lname, handle);
+out_stop:
mdd_trans_stop(env, mdd, rc, handle);
out_free:
/* finish lov_create stuff, free all temporary data */
RETURN(rc);
}
+static int mdd_declare_rename(const struct lu_env *env,
+ struct mdd_device *mdd,
+ struct mdd_object *mdd_spobj,
+ struct mdd_object *mdd_tpobj,
+ struct mdd_object *mdd_sobj,
+ struct mdd_object *mdd_tobj,
+ const struct lu_name *sname,
+ const struct lu_name *tname,
+ struct md_attr *ma,
+ struct thandle *handle)
+{
+ int rc;
+
+ LASSERT(mdd_spobj);
+ LASSERT(mdd_tpobj);
+ LASSERT(mdd_sobj);
+
+ /* name from source dir */
+ rc = mdo_declare_index_delete(env, mdd_spobj, sname->ln_name, handle);
+ if (rc)
+ return rc;
+
+ /* .. from source child */
+ if (S_ISDIR(mdd_object_type(mdd_sobj))) {
+ /* source child can be directory,
+ * counted by source dir's nlink */
+ rc = mdo_declare_ref_del(env, mdd_spobj, handle);
+ if (rc)
+ return rc;
+
+ rc = mdo_declare_index_delete(env, mdd_sobj, dotdot, handle);
+ if (rc)
+ return rc;
+
+ rc = mdo_declare_index_insert(env, mdd_sobj, mdo2fid(mdd_tpobj),
+ dotdot, handle);
+ if (rc)
+ return rc;
+
+ /* new target child can be directory,
+ * counted by target dir's nlink */
+ rc = mdo_declare_ref_add(env, mdd_tpobj, handle);
+ if (rc)
+ return rc;
+
+ }
+
+ rc = mdo_declare_attr_set(env, mdd_spobj, NULL, handle);
+ if (rc)
+ return rc;
+
+ rc = mdo_declare_attr_set(env, mdd_sobj, NULL, handle);
+ if (rc)
+ return rc;
+ mdd_declare_links_add(env, mdd_sobj, handle);
+ if (rc)
+ return rc;
+
+ rc = mdo_declare_attr_set(env, mdd_tpobj, NULL, handle);
+ if (rc)
+ return rc;
+
+ /* new name */
+ rc = mdo_declare_index_insert(env, mdd_tpobj, mdo2fid(mdd_sobj),
+ tname->ln_name, handle);
+ if (rc)
+ return rc;
+
+ /* name from target dir (old name), we declare it unconditionally
+ * as mdd_rename() calls delete unconditionally as well. so just
+ * to balance declarations vs calls to change ... */
+ rc = mdo_declare_index_delete(env, mdd_tpobj, tname->ln_name, handle);
+ if (rc)
+ return rc;
+
+ if (mdd_tobj && mdd_object_exists(mdd_tobj)) {
+ /* delete target child in target parent directory */
+ rc = mdo_declare_ref_del(env, mdd_tobj, handle);
+ if (rc)
+ return rc;
+
+ if (S_ISDIR(mdd_object_type(mdd_tobj))) {
+ /* target child can be directory,
+ * delete "." reference in target child directory */
+ rc = mdo_declare_ref_del(env, mdd_tobj, handle);
+ if (rc)
+ return rc;
+
+ /* delete ".." reference in target parent directory */
+ rc = mdo_declare_ref_del(env, mdd_tpobj, handle);
+ if (rc)
+ return rc;
+ }
+
+ rc = mdo_declare_attr_set(env, mdd_tobj, NULL, handle);
+ if (rc)
+ return rc;
+
+ mdd_declare_links_add(env, mdd_tobj, handle);
+ if (rc)
+ return rc;
+
+ rc = mdd_declare_finish_unlink(env, mdd_tobj, ma, handle);
+ if (rc)
+ return rc;
+ }
+
+ rc = mdd_declare_changelog_store(env, mdd, tname, handle);
+ if (rc)
+ return rc;
+
+ rc = mdd_declare_changelog_store(env, mdd, sname, handle);
+ if (rc)
+ return rc;
+
+ return rc;
+}
+
/* src object can be remote that is why we use only fid and type of object */
static int mdd_rename(const struct lu_env *env,
struct md_object *src_pobj, struct md_object *tgt_pobj,
}
}
#endif
- if (tobj && mdd_object_exists(mdd_tobj))
- mdd_log_txn_param_build(env, tobj, ma, MDD_TXN_RENAME_OP, 2);
- else
- mdd_txn_param_build(env, mdd, MDD_TXN_RENAME_OP, 2);
- handle = mdd_trans_start(env, mdd);
+ mdd_sobj = mdd_object_find(env, mdd, lf);
+
+ handle = mdd_trans_create(env, mdd);
if (IS_ERR(handle))
GOTO(out_pending, rc = PTR_ERR(handle));
+ rc = mdd_declare_rename(env, mdd, mdd_spobj, mdd_tpobj, mdd_sobj,
+ mdd_tobj, lsname, ltname, ma, handle);
+ if (rc)
+ GOTO(stop, rc);
+
+ rc = mdd_trans_start(env, mdd, handle);
+ if (rc)
+ GOTO(stop, rc);
+
/* FIXME: Should consider tobj and sobj too in rename_lock. */
rc = mdd_rename_order(env, mdd, mdd_spobj, mdd_tpobj);
if (rc < 0)
if (sdlh == NULL || tdlh == NULL)
GOTO(cleanup, rc = -ENOMEM);
- mdd_sobj = mdd_object_find(env, mdd, lf);
rc = mdd_rename_sanity_check(env, mdd_spobj, mdd_tpobj,
mdd_sobj, mdd_tobj, ma);
if (rc)
ltname, handle);
}
+stop:
mdd_trans_stop(env, mdd, rc, handle);
if (mdd_sobj)
mdd_object_put(env, mdd_sobj);
return 0;
}
+static int mdd_declare_links_add(const struct lu_env *env,
+ struct mdd_object *mdd_obj,
+ struct thandle *handle)
+{
+ int rc;
+
+ /* XXX: max size? */
+ rc = mdo_declare_xattr_set(env, mdd_obj,
+ mdd_buf_get_const(env, NULL, 4096),
+ XATTR_NAME_LINK, 0, handle);
+
+ return rc;
+}
+
/* For pathologic linkers, we don't want to spend lots of time scanning the
* link ea. Limit ourseleves to something reasonable; links not in the EA
* can be looked up via (slower) parent lookup.
* are already protected by ldlm lock */
#define MDD_DISABLE_PDO_LOCK 1
-enum mdd_txn_op {
- MDD_TXN_OBJECT_DESTROY_OP = 0,
- MDD_TXN_OBJECT_CREATE_OP,
- MDD_TXN_ATTR_SET_OP,
- MDD_TXN_XATTR_SET_OP,
- MDD_TXN_INDEX_INSERT_OP,
- MDD_TXN_INDEX_DELETE_OP,
- MDD_TXN_LINK_OP,
- MDD_TXN_UNLINK_OP,
- MDD_TXN_RENAME_OP,
- MDD_TXN_RENAME_TGT_OP,
- MDD_TXN_CREATE_DATA_OP,
- MDD_TXN_MKDIR_OP,
- MDD_TXN_CLOSE_OP,
- MDD_TXN_LAST_OP
-};
-
-struct mdd_txn_op_descr {
- enum mdd_txn_op mod_op;
- unsigned int mod_credits;
-};
-
/* Changelog flags */
/** changelog is recording */
#define CLM_ON 0x00001
struct lu_fid mdd_root_fid;
struct dt_device_param mdd_dt_conf;
struct dt_object *mdd_orphans;
+ struct dt_object *mdd_capa;
struct dt_txn_callback mdd_txn_cb;
cfs_proc_dir_entry_t *mdd_proc_entry;
struct lprocfs_stats *mdd_stats;
- struct mdd_txn_op_descr mdd_tod[MDD_TXN_LAST_OP];
struct mdd_changelog mdd_cl;
unsigned long mdd_atime_diff;
struct mdd_object *mdd_dot_lustre;
};
struct mdd_thread_info {
- struct txn_param mti_param;
struct lu_fid mti_fid;
struct lu_fid mti_fid2; /* used for be & cpu converting */
struct lu_attr mti_la;
struct lov_mds_md **lmm, int *lmm_size,
const struct md_op_spec *spec, struct lu_attr *la);
int mdd_lov_objid_prepare(struct mdd_device *mdd, struct lov_mds_md *lmm);
+int mdd_declare_lov_objid_update(const struct lu_env *, struct mdd_device *,
+ struct thandle *);
void mdd_lov_objid_update(struct mdd_device *mdd, struct lov_mds_md *lmm);
void mdd_lov_create_finish(const struct lu_env *env, struct mdd_device *mdd,
struct lov_mds_md *lmm, int lmm_size,
struct lu_attr *attr,
struct thandle *handle,
int needacl);
+int mdd_declare_object_kill(const struct lu_env *env, struct mdd_object *obj,
+ struct md_attr *ma, struct thandle *handle);
int mdd_object_kill(const struct lu_env *env, struct mdd_object *obj,
- struct md_attr *ma);
+ struct md_attr *ma, struct thandle *handle);
int mdd_iattr_get(const struct lu_env *env, struct mdd_object *mdd_obj,
struct md_attr *ma);
int mdd_attr_get_internal(const struct lu_env *env, struct mdd_object *mdd_obj,
struct lu_name *lname, struct lu_fid *pfid);
/* mdd_lov.c */
+int mdd_declare_unlink_log(const struct lu_env *env, struct mdd_object *obj,
+ struct md_attr *ma, struct thandle *handle);
int mdd_unlink_log(const struct lu_env *env, struct mdd_device *mdd,
struct mdd_object *mdd_cobj, struct md_attr *ma);
struct thandle *);
int orph_index_init(const struct lu_env *env, struct mdd_device *mdd);
void orph_index_fini(const struct lu_env *env, struct mdd_device *mdd);
-int mdd_txn_init_credits(const struct lu_env *env, struct mdd_device *mdd);
+int orph_declare_index_insert(const struct lu_env *, struct mdd_object *,
+ struct thandle *);
+int orph_declare_index_delete(const struct lu_env *, struct mdd_object *,
+ struct thandle *);
/* mdd_lproc.c */
void lprocfs_mdd_init_vars(struct lprocfs_static_vars *lvars);
int mdd_get_default_md(struct mdd_object *mdd_obj, struct lov_mds_md *lmm);
int mdd_readpage(const struct lu_env *env, struct md_object *obj,
const struct lu_rdpg *rdpg);
+int mdd_declare_llog_record(const struct lu_env *env, struct mdd_device *mdd,
+ int reclen, struct thandle *handle);
+int mdd_declare_changelog_store(const struct lu_env *env,
+ struct mdd_device *mdd,
+ const struct lu_name *fname,
+ struct thandle *handle);
int mdd_changelog(const struct lu_env *env, enum changelog_rec_type type,
int flags, struct md_object *obj);
+int mdd_declare_object_create_internal(const struct lu_env *env,
+ struct mdd_object *p,
+ struct mdd_object *c,
+ struct md_attr *ma,
+ struct thandle *handle,
+ const struct md_op_spec *spec);
/* mdd_quota.c*/
#ifdef HAVE_QUOTA_SUPPORT
int mdd_quota_notify(const struct lu_env *env, struct md_device *m);
#endif
/* mdd_trans.c */
-void mdd_txn_param_build(const struct lu_env *env, struct mdd_device *mdd,
- enum mdd_txn_op, int changelog_cnt);
-int mdd_create_txn_param_build(const struct lu_env *env, struct mdd_device *mdd,
- struct lov_mds_md *lmm, enum mdd_txn_op op,
- int changelog_cnt);
-int mdd_log_txn_param_build(const struct lu_env *env, struct md_object *obj,
- struct md_attr *ma, enum mdd_txn_op,
- int changelog_cnt);
-void mdd_setattr_txn_param_build(const struct lu_env *env,
- struct md_object *obj,
- struct md_attr *ma, enum mdd_txn_op,
- int changelog_cnt);
-
int mdd_lov_destroy(const struct lu_env *env, struct mdd_device *mdd,
struct mdd_object *obj, struct lu_attr *la);
lu_object_put(env, &o->mod_obj.mo_lu);
}
-struct thandle* mdd_trans_start(const struct lu_env *env,
- struct mdd_device *);
-
+struct thandle *mdd_trans_create(const struct lu_env *env,
+ struct mdd_device *mdd);
+int mdd_trans_start(const struct lu_env *env, struct mdd_device *mdd,
+ struct thandle *th);
void mdd_trans_stop(const struct lu_env *env, struct mdd_device *mdd,
int rc, struct thandle *handle);
-
-int mdd_txn_start_cb(const struct lu_env *env, struct txn_param *param,
- void *cookie);
-
int mdd_txn_stop_cb(const struct lu_env *env, struct thandle *txn,
void *cookie);
+int mdd_txn_start_cb(const struct lu_env *env, struct thandle *,
+ void *cookie);
/* mdd_device.c */
struct lu_object *mdd_object_alloc(const struct lu_env *env,
struct md_attr *ma);
int mdd_acl_chmod(const struct lu_env *env, struct mdd_object *o, __u32 mode,
struct thandle *handle);
+int __mdd_declare_acl_init(const struct lu_env *env, struct mdd_object *obj,
+ int is_dir, struct thandle *handle);
int __mdd_acl_init(const struct lu_env *env, struct mdd_object *obj,
struct lu_buf *buf, __u32 *mode, struct thandle *handle);
int __mdd_permission_internal(const struct lu_env *env, struct mdd_object *obj,
return next->do_ops->do_attr_get(env, next, la, capa);
}
-static inline int mdo_attr_set(const struct lu_env *env, struct mdd_object *obj,
- const struct lu_attr *la, struct thandle *handle,
+static inline int mdo_declare_attr_set(const struct lu_env *env,
+ struct mdd_object *obj,
+ const struct lu_attr *la,
+ struct thandle *handle)
+{
+ struct dt_object *next = mdd_object_child(obj);
+ return dt_declare_attr_set(env, next, la, handle);
+}
+
+static inline int mdo_attr_set(const struct lu_env *env,
+ struct mdd_object *obj,
+ const struct lu_attr *la,
+ struct thandle *handle,
struct lustre_capa *capa)
{
struct dt_object *next = mdd_object_child(obj);
return next->do_ops->do_xattr_get(env, next, buf, name, capa);
}
+static inline int mdo_declare_xattr_set(const struct lu_env *env,
+ struct mdd_object *obj,
+ const struct lu_buf *buf,
+ const char *name,
+ int fl, struct thandle *handle)
+{
+ struct dt_object *next = mdd_object_child(obj);
+ return dt_declare_xattr_set(env, next, buf, name, fl, handle);
+}
+
static inline int mdo_xattr_set(const struct lu_env *env,struct mdd_object *obj,
const struct lu_buf *buf, const char *name,
int fl, struct thandle *handle,
capa);
}
+static inline int mdo_declare_xattr_del(const struct lu_env *env,
+ struct mdd_object *obj,
+ const char *name,
+ struct thandle *handle)
+{
+ struct dt_object *next = mdd_object_child(obj);
+ return dt_declare_xattr_del(env, next, name, handle);
+}
+
static inline int mdo_xattr_del(const struct lu_env *env,struct mdd_object *obj,
const char *name, struct thandle *handle,
struct lustre_capa *capa)
return next->do_ops->do_index_try(env, next, feat);
}
-static inline void mdo_ref_add(const struct lu_env *env, struct mdd_object *obj,
- struct thandle *handle)
+static inline
+int mdo_declare_index_insert(const struct lu_env *env, struct mdd_object *obj,
+ const struct lu_fid *fid, const char *name,
+ struct thandle *handle)
+{
+ struct dt_object *next = mdd_object_child(obj);
+ int rc = 0;
+
+ /*
+ * if the object doesn't exist yet, then it's supposed to be created
+ * and declaration of the creation should be enough to insert ./..
+ */
+ if (mdd_object_exists(obj)) {
+ rc = -ENOTDIR;
+ if (dt_try_as_dir(env, next))
+ rc = dt_declare_insert(env, next,
+ (struct dt_rec *)fid,
+ (const struct dt_key *)name,
+ handle);
+ }
+
+ return rc;
+}
+
+static inline
+int mdo_declare_index_delete(const struct lu_env *env, struct mdd_object *obj,
+ const char *name, struct thandle *handle)
+{
+ struct dt_object *next = mdd_object_child(obj);
+
+ if (!dt_try_as_dir(env, next))
+ return -ENOTDIR;
+
+ return dt_declare_delete(env, next, (const struct dt_key *)name,
+ handle);
+}
+
+static inline int mdo_declare_ref_add(const struct lu_env *env,
+ struct mdd_object *obj,
+ struct thandle *handle)
+{
+ struct dt_object *next = mdd_object_child(obj);
+ return dt_declare_ref_add(env, next, handle);
+}
+
+static inline int mdo_ref_add(const struct lu_env *env, struct mdd_object *obj,
+ struct thandle *handle)
{
struct dt_object *next = mdd_object_child(obj);
LASSERT(mdd_object_exists(obj));
return next->do_ops->do_ref_add(env, next, handle);
}
-static inline void mdo_ref_del(const struct lu_env *env, struct mdd_object *obj,
- struct thandle *handle)
+static inline int mdo_declare_ref_del(const struct lu_env *env,
+ struct mdd_object *obj,
+ struct thandle *handle)
+{
+ struct dt_object *next = mdd_object_child(obj);
+ return dt_declare_ref_del(env, next, handle);
+}
+
+static inline int mdo_ref_del(const struct lu_env *env, struct mdd_object *obj,
+ struct thandle *handle)
{
struct dt_object *next = mdd_object_child(obj);
LASSERT(mdd_object_exists(obj));
}
static inline
+int mdo_declare_create_obj(const struct lu_env *env, struct mdd_object *o,
+ struct lu_attr *attr,
+ struct dt_allocation_hint *hint,
+ struct dt_object_format *dof,
+ struct thandle *handle)
+{
+ struct dt_object *next = mdd_object_child(o);
+ return next->do_ops->do_declare_create(env, next, attr, hint,
+ dof, handle);
+}
+
+static inline
int mdo_create_obj(const struct lu_env *env, struct mdd_object *o,
struct lu_attr *attr,
struct dt_allocation_hint *hint,
return next->do_ops->do_create(env, next, attr, hint, dof, handle);
}
+static inline
+int mdo_declare_destroy(const struct lu_env *env, struct mdd_object *o,
+ struct thandle *handle)
+{
+ struct dt_object *next = mdd_object_child(o);
+ return dt_declare_destroy(env, next, handle);
+}
+
+static inline
+int mdo_destroy(const struct lu_env *env, struct mdd_object *o,
+ struct thandle *handle)
+{
+ struct dt_object *next = mdd_object_child(o);
+ return dt_destroy(env, next, handle);
+}
+
static inline struct obd_capa *mdo_capa_get(const struct lu_env *env,
struct mdd_object *obj,
struct lustre_capa *old,
return mds_lov_prepare_objids(mdd->mdd_obd_dev, lmm);
}
+int mdd_declare_lov_objid_update(const struct lu_env *env,
+ struct mdd_device *mdd,
+ struct thandle *handle)
+{
+ struct obd_device *obd = mdd2obd_dev(mdd);
+ int size;
+
+ /* in prepare we create local files */
+ if (unlikely(mdd->mdd_capa == NULL))
+ return 0;
+
+ /* XXX: this is a temporary solution to declare llog changes
+ * will be fixed in 2.3 with new llog implementation */
+
+ size = obd->u.mds.mds_lov_desc.ld_tgt_count * sizeof(obd_id);
+ return dt_declare_record_write(env, mdd->mdd_capa, size, 0, handle);
+}
+
void mdd_lov_objid_update(struct mdd_device *mdd, struct lov_mds_md *lmm)
{
/* copy mds_lov code is using wrong layer */
RETURN(rc);
}
+int mdd_declare_unlink_log(const struct lu_env *env, struct mdd_object *obj,
+ struct md_attr *ma, struct thandle *handle)
+{
+ struct mdd_device *mdd = mdo2mdd(&obj->mod_obj);
+ int rc, stripe, i;
+
+ LASSERT(obj);
+ LASSERT(ma);
+
+ if (!S_ISREG(lu_object_attr(&obj->mod_obj.mo_lu)))
+ return 0;
+
+ rc = mdd_lmm_get_locked(env, obj, ma);
+ if (rc || !(ma->ma_valid & MA_LOV))
+ return rc;
+
+ LASSERT(ma->ma_lmm);
+ if (le32_to_cpu(ma->ma_lmm->lmm_magic) != LOV_MAGIC_V1 &&
+ le32_to_cpu(ma->ma_lmm->lmm_magic) != LOV_MAGIC_V3) {
+ CERROR("%s: invalid LOV_MAGIC %08x on object "DFID"\n",
+ mdd->mdd_obd_dev->obd_name,
+ le32_to_cpu(ma->ma_lmm->lmm_magic),
+ PFID(lu_object_fid(&obj->mod_obj.mo_lu)));
+ return -EINVAL;
+ }
+
+ if ((int)le32_to_cpu(ma->ma_lmm->lmm_stripe_count) < 0)
+ stripe = mdd2obd_dev(mdd)->u.mds.mds_lov_desc.ld_tgt_count;
+ else
+ stripe = le32_to_cpu(ma->ma_lmm->lmm_stripe_count);
+
+ for (i = 0; i < stripe; i++) {
+ rc = mdd_declare_llog_record(env, mdd,
+ sizeof(struct llog_unlink_rec),
+ handle);
+ if (rc)
+ return rc;
+ }
+
+ return rc;
+}
+
int mdd_unlink_log(const struct lu_env *env, struct mdd_device *mdd,
struct mdd_object *mdd_cobj, struct md_attr *ma)
{
#include <lprocfs_status.h>
/* fid_be_cpu(), fid_cpu_to_be(). */
#include <lustre_fid.h>
+#include <obd_lov.h>
#include <lustre_param.h>
#include <lustre_mds.h>
RETURN(rc);
}
+int mdd_declare_object_create_internal(const struct lu_env *env,
+ struct mdd_object *p,
+ struct mdd_object *c,
+ struct md_attr *ma,
+ struct thandle *handle,
+ const struct md_op_spec *spec)
+{
+ struct dt_object_format *dof = &mdd_env_info(env)->mti_dof;
+ const struct dt_index_features *feat = spec->sp_feat;
+ int rc;
+ ENTRY;
+
+ if (feat != &dt_directory_features && feat != NULL)
+ dof->dof_type = DFT_INDEX;
+ else
+ dof->dof_type = dt_mode_to_dft(ma->ma_attr.la_mode);
+
+ dof->u.dof_idx.di_feat = feat;
+
+ rc = mdo_declare_create_obj(env, c, &ma->ma_attr, NULL, dof, handle);
+
+ RETURN(rc);
+}
+
int mdd_object_create_internal(const struct lu_env *env, struct mdd_object *p,
struct mdd_object *c, struct md_attr *ma,
struct thandle *handle,
RETURN(0);
LASSERT(mdd_obj != NULL);
+ LASSERT(handle != NULL);
if ((type >= CL_MTIME) && (type <= CL_ATIME) &&
cfs_time_before_64(mdd->mdd_cl.mc_starttime, mdd_obj->mod_cltime)) {
rec->cr.cr_namelen = 0;
mdd_obj->mod_cltime = cfs_time_current_64();
- if (handle == NULL) {
- /* Used for the close event only for now. */
- LASSERT(type == CL_CLOSE);
- LASSERT(mdd_env_info(env)->mti_param.tp_credits != 0);
- th = mdd_trans_start(env, mdd);
- if (IS_ERR(th))
- GOTO(err, rc = PTR_ERR(th));
- }
-
rc = mdd_changelog_llog_write(mdd, rec, handle ? : th);
if (th)
mdd_trans_stop(env, mdd, rc, th);
-err:
+
if (rc < 0) {
CERROR("changelog failed: rc=%d op%d t"DFID"\n",
rc, type, PFID(tfid));
int rc;
ENTRY;
- handle = mdd_trans_start(env, mdd);
-
+ handle = mdd_trans_create(env, mdd);
if (IS_ERR(handle))
return(PTR_ERR(handle));
+ rc = mdd_declare_changelog_store(env, mdd, NULL, handle);
+ if (rc)
+ GOTO(stop, rc);
+
+ rc = mdd_trans_start(env, mdd, handle);
+ if (rc)
+ GOTO(stop, rc);
+
rc = mdd_changelog_data_store(env, mdd, type, flags, mdd_obj,
handle);
+stop:
mdd_trans_stop(env, mdd, rc, handle);
RETURN(rc);
md2mdd_obj(obj), handle);
}
+static int mdd_declare_attr_set(const struct lu_env *env,
+ struct mdd_device *mdd,
+ struct mdd_object *obj,
+ const struct md_attr *ma,
+ struct lov_mds_md *lmm,
+ struct thandle *handle)
+{
+ struct lu_buf *buf = &mdd_env_info(env)->mti_buf;
+ int rc, stripe, i;
+
+ rc = mdo_declare_attr_set(env, obj, &ma->ma_attr, handle);
+ if (rc)
+ return rc;
+
+ rc = mdd_declare_changelog_store(env, mdd, NULL, handle);
+ if (rc)
+ return rc;
+
+ if (ma->ma_valid & MA_LOV) {
+ buf->lb_buf = NULL;
+ buf->lb_len = ma->ma_lmm_size;
+ rc = mdo_declare_xattr_set(env, obj, buf, XATTR_NAME_LOV,
+ 0, handle);
+ if (rc)
+ return rc;
+ }
+
+ if (ma->ma_valid & (MA_HSM | MA_SOM)) {
+ buf->lb_buf = NULL;
+ buf->lb_len = sizeof(struct lustre_mdt_attrs);
+ rc = mdo_declare_xattr_set(env, obj, buf, XATTR_NAME_LMA,
+ 0, handle);
+ if (rc)
+ return rc;
+ }
+
+ /* basically the log is the same as in unlink case */
+ if (lmm) {
+ if (le32_to_cpu(lmm->lmm_magic) != LOV_MAGIC_V1 &&
+ le32_to_cpu(lmm->lmm_magic) != LOV_MAGIC_V3) {
+ CERROR("%s: invalid LOV_MAGIC %08x on object "DFID"\n",
+ mdd->mdd_obd_dev->obd_name,
+ le32_to_cpu(lmm->lmm_magic),
+ PFID(lu_object_fid(&obj->mod_obj.mo_lu)));
+ return -EINVAL;
+ }
+
+ stripe = mdd2obd_dev(mdd)->u.mds.mds_lov_desc.ld_tgt_count;
+ if ((int)le32_to_cpu(lmm->lmm_stripe_count) >= 0)
+ stripe = le32_to_cpu(lmm->lmm_stripe_count);
+
+ for (i = 0; i < stripe; i++) {
+ rc = mdd_declare_llog_record(env, mdd,
+ sizeof(struct llog_unlink_rec),
+ handle);
+ if (rc)
+ return rc;
+ }
+ }
+
+ return rc;
+}
+
/* set attr and LOV EA at once, return updated attr */
static int mdd_attr_set(const struct lu_env *env, struct md_object *obj,
const struct md_attr *ma)
struct thandle *handle;
struct lov_mds_md *lmm = NULL;
struct llog_cookie *logcookies = NULL;
- int rc, lmm_size = 0, cookie_size = 0, chlog_cnt;
+ int rc, lmm_size = 0, cookie_size = 0;
struct lu_attr *la_copy = &mdd_env_info(env)->mti_la_for_fix;
struct obd_device *obd = mdd->mdd_obd_dev;
struct mds_obd *mds = &obd->u.mds;
ma->ma_attr.la_valid == LA_ATIME && la_copy->la_valid == 0)
RETURN(0);
- /*TODO: add lock here*/
- /* start a log jounal handle if needed */
if (S_ISREG(mdd_object_type(mdd_obj)) &&
ma->ma_attr.la_valid & (LA_UID | LA_GID)) {
lmm_size = mdd_lov_mdsize(env, mdd);
lmm = mdd_max_lmm_get(env, mdd);
if (lmm == NULL)
- GOTO(no_trans, rc = -ENOMEM);
+ RETURN(-ENOMEM);
rc = mdd_get_md_locked(env, mdd_obj, lmm, &lmm_size,
XATTR_NAME_LOV);
if (rc < 0)
- GOTO(no_trans, rc);
- }
-
- chlog_cnt = 1;
- if (la_copy->la_valid && !(la_copy->la_valid & LA_FLAGS) && lmm_size) {
- chlog_cnt += (lmm->lmm_stripe_count >= 0) ?
- lmm->lmm_stripe_count : mds->mds_lov_desc.ld_tgt_count;
+ RETURN(rc);
}
- mdd_setattr_txn_param_build(env, obj, (struct md_attr *)ma,
- MDD_TXN_ATTR_SET_OP, chlog_cnt);
- handle = mdd_trans_start(env, mdd);
+ handle = mdd_trans_create(env, mdd);
if (IS_ERR(handle))
- GOTO(no_trans, rc = PTR_ERR(handle));
+ RETURN(PTR_ERR(handle));
+
+ rc = mdd_declare_attr_set(env, mdd, mdd_obj, ma,
+ lmm_size > 0 ? lmm : NULL, handle);
+ if (rc)
+ GOTO(stop, rc);
+
+ /* permission changes may require sync operation */
+ if (ma->ma_attr.la_valid & (LA_MODE|LA_UID|LA_GID))
+ handle->th_sync = !!mdd->mdd_sync_permission;
+
+ rc = mdd_trans_start(env, mdd, handle);
+ if (rc)
+ GOTO(stop, rc);
/* permission changes may require sync operation */
if (ma->ma_attr.la_valid & (LA_MODE|LA_UID|LA_GID))
if (rc == 0)
rc = mdd_attr_set_changelog(env, obj, handle,
ma->ma_attr.la_valid);
+stop:
mdd_trans_stop(env, mdd, rc, handle);
-no_trans:
if (rc == 0 && (lmm != NULL && lmm_size > 0 )) {
/*set obd attr, if needed*/
rc = mdd_lov_setattr_async(env, mdd_obj, lmm, lmm_size,
RETURN(rc);
}
+static int mdd_declare_xattr_set(const struct lu_env *env,
+ struct mdd_device *mdd,
+ struct mdd_object *obj,
+ const struct lu_buf *buf,
+ const char *name,
+ struct thandle *handle)
+
+{
+ int rc;
+
+ rc = mdo_declare_xattr_set(env, obj, buf, name, 0, handle);
+ if (rc)
+ return rc;
+
+ /* Only record user xattr changes */
+ if ((strncmp("user.", name, 5) == 0))
+ rc = mdd_declare_changelog_store(env, mdd, NULL, handle);
+
+ return rc;
+}
+
/**
* The caller should guarantee to update the object ctime
* after xattr_set if needed.
if (rc)
RETURN(rc);
- mdd_txn_param_build(env, mdd, MDD_TXN_XATTR_SET_OP, 1);
- handle = mdd_trans_start(env, mdd);
+ handle = mdd_trans_create(env, mdd);
if (IS_ERR(handle))
RETURN(PTR_ERR(handle));
/* security-replated changes may require sync */
+ if (!strcmp(name, XATTR_NAME_ACL_ACCESS) &&
+ mdd->mdd_sync_permission == 1)
+ handle->th_sync = 1;
+
+ rc = mdd_declare_xattr_set(env, mdd, mdd_obj, buf, name, handle);
+ if (rc)
+ GOTO(stop, rc);
+
+ rc = mdd_trans_start(env, mdd, handle);
+ if (rc)
+ GOTO(stop, rc);
+
+ /* security-replated changes may require sync */
if (!strcmp(name, XATTR_NAME_ACL_ACCESS))
handle->th_sync |= mdd->mdd_sync_permission;
sizeof(POSIX_ACL_XATTR_DEFAULT) - 1) == 0))
rc = mdd_changelog_data_store(env, mdd, CL_XATTR, 0, mdd_obj,
handle);
+
+stop:
mdd_trans_stop(env, mdd, rc, handle);
RETURN(rc);
}
+static int mdd_declare_xattr_del(const struct lu_env *env,
+ struct mdd_device *mdd,
+ struct mdd_object *obj,
+ const char *name,
+ struct thandle *handle)
+{
+ int rc;
+
+ rc = mdo_declare_xattr_del(env, obj, name, handle);
+ if (rc)
+ return rc;
+
+ /* Only record user xattr changes */
+ if ((strncmp("user.", name, 5) == 0))
+ rc = mdd_declare_changelog_store(env, mdd, NULL, handle);
+
+ return rc;
+}
+
/**
* The caller should guarantee to update the object ctime
* after xattr_set if needed.
if (rc)
RETURN(rc);
- mdd_txn_param_build(env, mdd, MDD_TXN_XATTR_SET_OP, 1);
- handle = mdd_trans_start(env, mdd);
+ handle = mdd_trans_create(env, mdd);
if (IS_ERR(handle))
RETURN(PTR_ERR(handle));
+ rc = mdd_declare_xattr_del(env, mdd, mdd_obj, name, handle);
+ if (rc)
+ GOTO(stop, rc);
+
+ rc = mdd_trans_start(env, mdd, handle);
+ if (rc)
+ GOTO(stop, rc);
+
mdd_write_lock(env, mdd_obj, MOR_TGT_CHILD);
rc = mdo_xattr_del(env, mdd_obj, name, handle,
mdd_object_capa(env, mdd_obj));
rc = mdd_changelog_data_store(env, mdd, CL_XATTR, 0, mdd_obj,
handle);
+stop:
mdd_trans_stop(env, mdd, rc, handle);
RETURN(rc);
int rc;
ENTRY;
+ /* XXX: this code won't be used ever:
+ * DNE uses slightly different approach */
+ LBUG();
+
/*
* Check -ENOENT early here because we need to get object type
* to calculate credits before transaction start
LASSERT(mdd_object_exists(mdd_obj) > 0);
- rc = mdd_log_txn_param_build(env, obj, ma, MDD_TXN_UNLINK_OP, 0);
- if (rc)
- RETURN(rc);
-
- handle = mdd_trans_start(env, mdd);
+ handle = mdd_trans_create(env, mdd);
if (IS_ERR(handle))
RETURN(-ENOMEM);
+ rc = mdd_trans_start(env, mdd, handle);
+
mdd_write_lock(env, mdd_obj, MOR_TGT_CHILD);
rc = mdd_unlink_sanity_check(env, NULL, mdd_obj, ma);
int rc = 0;
ENTRY;
+ /* XXX: this code won't be used ever:
+ * DNE uses slightly different approach */
+ LBUG();
+
#ifdef HAVE_QUOTA_SUPPORT
if (mds->mds_quota) {
quota_opc = FSFILT_OP_CREATE_PARTIAL_CHILD;
}
#endif
- mdd_txn_param_build(env, mdd, MDD_TXN_OBJECT_CREATE_OP, 0);
- handle = mdd_trans_start(env, mdd);
+ handle = mdd_trans_create(env, mdd);
if (IS_ERR(handle))
GOTO(out_pending, rc = PTR_ERR(handle));
+ rc = mdd_trans_start(env, mdd, handle);
+
mdd_write_lock(env, mdd_obj, MOR_TGT_CHILD);
rc = mdd_oc_sanity_check(env, mdd_obj, ma);
if (rc)
int rc;
ENTRY;
- mdd_txn_param_build(env, mdd, MDD_TXN_XATTR_SET_OP, 0);
- handle = mdd_trans_start(env, mdd);
+ /* XXX: this code won't be used ever:
+ * DNE uses slightly different approach */
+ LBUG();
+
+ handle = mdd_trans_create(env, mdd);
if (IS_ERR(handle))
RETURN(-ENOMEM);
+ rc = mdd_trans_start(env, mdd, handle);
+
mdd_write_lock(env, mdd_obj, MOR_TGT_CHILD);
rc = mdd_link_sanity_check(env, NULL, NULL, mdd_obj);
if (rc == 0)
return rc;
}
+int mdd_declare_object_kill(const struct lu_env *env, struct mdd_object *obj,
+ struct md_attr *ma, struct thandle *handle)
+{
+ int rc;
+
+ rc = mdd_declare_unlink_log(env, obj, ma, handle);
+ if (rc)
+ return rc;
+
+ return mdo_declare_destroy(env, obj, handle);
+}
+
/* return md_attr back,
* if it is last unlink then return lov ea + llog cookie*/
int mdd_object_kill(const struct lu_env *env, struct mdd_object *obj,
- struct md_attr *ma)
+ struct md_attr *ma, struct thandle *handle)
{
int rc = 0;
ENTRY;
rc = mdd_unlink_log(env, mdo2mdd(&obj->mod_obj),
obj, ma);
}
+
+ if (rc == 0)
+ rc = mdo_destroy(env, obj, handle);
+
RETURN(rc);
}
+static int mdd_declare_close(const struct lu_env *env,
+ struct mdd_object *obj,
+ struct md_attr *ma,
+ struct thandle *handle)
+{
+ int rc;
+
+ rc = orph_declare_index_delete(env, obj, handle);
+ if (rc)
+ return rc;
+
+ return mdd_declare_object_kill(env, obj, ma, handle);
+}
+
/*
* No permission check is needed.
*/
if (mdd_obj->mod_count == 1 &&
(mdd_obj->mod_flags & (ORPHAN_OBJ | DEAD_OBJ)) != 0) {
again:
- rc = mdd_log_txn_param_build(env, obj, ma,
- MDD_TXN_UNLINK_OP, 1);
- if (rc)
- RETURN(rc);
- handle = mdd_trans_start(env, mdo2mdd(obj));
+ handle = mdd_trans_create(env, mdo2mdd(obj));
if (IS_ERR(handle))
RETURN(PTR_ERR(handle));
+
+ rc = mdd_declare_close(env, mdd_obj, ma, handle);
+ if (rc)
+ GOTO(stop, rc);
+
+ rc = mdd_declare_changelog_store(env, mdd, NULL, handle);
+ if (rc)
+ GOTO(stop, rc);
+
+ rc = mdd_trans_start(env, mdo2mdd(obj), handle);
+ if (rc)
+ GOTO(stop, rc);
}
mdd_write_lock(env, mdd_obj, MOR_TGT_CHILD);
if (mdd_obj->mod_count == 0 && mdd_obj->mod_flags & ORPHAN_OBJ) {
/* remove link to object from orphan index */
+ LASSERT(handle != NULL);
rc = __mdd_orphan_del(env, mdd_obj, handle);
if (rc == 0) {
CDEBUG(D_HA, "Object "DFID" is deleted from orphan "
ma->ma_attr_flags & MDS_CLOSE_CLEANUP) {
rc = mdd_lov_destroy(env, mdd, mdd_obj, &ma->ma_attr);
} else {
- rc = mdd_object_kill(env, mdd_obj, ma);
+ if (handle == NULL) {
+ handle = mdd_trans_create(env, mdo2mdd(obj));
+ if (IS_ERR(handle))
+ GOTO(out, rc = PTR_ERR(handle));
+
+ rc = mdd_declare_object_kill(env, mdd_obj, ma,
+ handle);
+ if (rc)
+ GOTO(out, rc);
+
+ rc = mdd_declare_changelog_store(env, mdd,
+ NULL, handle);
+ if (rc)
+ GOTO(stop, rc);
+
+ rc = mdd_trans_start(env, mdo2mdd(obj), handle);
+ if (rc)
+ GOTO(out, rc);
+ }
+
+ rc = mdd_object_kill(env, mdd_obj, ma, handle);
if (rc == 0)
reset = 0;
}
if (rc == 0 &&
(mode & (FMODE_WRITE | MDS_OPEN_APPEND | MDS_OPEN_TRUNC)) &&
!(ma->ma_valid & MA_FLAGS && ma->ma_attr_flags & MDS_RECOV_OPEN)) {
- if (handle == 0)
- mdd_txn_param_build(env, mdd, MDD_TXN_CLOSE_OP, 1);
+ if (handle == NULL) {
+ handle = mdd_trans_create(env, mdo2mdd(obj));
+ if (IS_ERR(handle))
+ GOTO(stop, rc = IS_ERR(handle));
+
+ rc = mdd_declare_changelog_store(env, mdd, NULL,
+ handle);
+ if (rc)
+ GOTO(stop, rc);
+
+ rc = mdd_trans_start(env, mdo2mdd(obj), handle);
+ if (rc)
+ GOTO(stop, rc);
+ }
+
mdd_changelog_data_store(env, mdd, CL_CLOSE, mode,
mdd_obj, handle);
}
+stop:
if (handle != NULL)
mdd_trans_stop(env, mdd, rc, handle);
#ifdef HAVE_QUOTA_SUPPORT
int len;
int recsize;
- len = iops->key_size(env, it);
+ len = iops->key_size(env, it);
/* IAM iterator can return record with zero len. */
if (len == 0)
recsize = lu_dirent_calc_size(len, attr);
if (nob >= recsize) {
- result = iops->rec(env, it, ent, attr);
+ result = iops->rec(env, it, (struct dt_rec *)ent, attr);
if (result == -ESTALE)
goto next;
if (result != 0)
return next->do_ops->do_object_sync(env, next);
}
-static dt_obj_version_t mdd_version_get(const struct lu_env *env,
- struct md_object *obj)
-{
- struct mdd_object *mdd_obj = md2mdd_obj(obj);
-
- LASSERT(mdd_object_exists(mdd_obj));
- return do_version_get(env, mdd_object_child(mdd_obj));
-}
-
-static void mdd_version_set(const struct lu_env *env, struct md_object *obj,
- dt_obj_version_t version)
-{
- struct mdd_object *mdd_obj = md2mdd_obj(obj);
-
- LASSERT(mdd_object_exists(mdd_obj));
- do_version_set(env, mdd_object_child(mdd_obj), version);
-}
-
const struct md_object_operations mdd_obj_ops = {
.moo_permission = mdd_permission,
.moo_attr_get = mdd_attr_get,
.moo_changelog = mdd_changelog,
.moo_capa_get = mdd_capa_get,
.moo_object_sync = mdd_object_sync,
- .moo_version_get = mdd_version_get,
- .moo_version_set = mdd_version_set,
.moo_path = mdd_path,
.moo_file_lock = mdd_file_lock,
.moo_file_unlock = mdd_file_unlock,
#include "mdd_internal.h"
const char orph_index_name[] = "PENDING";
+const char *dotdot = "..";
enum {
ORPH_OP_UNLINK,
}
+int orph_declare_index_insert(const struct lu_env *env,
+ struct mdd_object *obj,
+ struct thandle *th)
+{
+ struct mdd_device *mdd = mdo2mdd(&obj->mod_obj);
+ int rc;
+
+ rc = dt_declare_insert(env, mdd->mdd_orphans, NULL, NULL, th);
+ if (rc)
+ return rc;
+
+ rc = mdo_declare_ref_add(env, obj, th);
+ if (rc)
+ return rc;
+
+ if (!S_ISDIR(mdd_object_type(obj)))
+ return 0;
+
+ rc = mdo_declare_ref_add(env, obj, th);
+ if (rc)
+ return rc;
+
+ rc = dt_declare_ref_add(env, mdd->mdd_orphans, th);
+ if (rc)
+ return rc;
+
+ rc = mdo_declare_index_delete(env, obj, dotdot, th);
+ if (rc)
+ return rc;
+
+ rc = mdo_declare_index_insert(env, obj, NULL, dotdot, th);
+
+ return rc;
+}
+
static int orph_index_insert(const struct lu_env *env,
struct mdd_object *obj,
__u32 op,
struct dt_object *dor = mdd->mdd_orphans;
const struct lu_fid *lf_dor = lu_object_fid(&dor->do_lu);
struct dt_object *next = mdd_object_child(obj);
- const struct dt_key *dotdot = (const struct dt_key *) "..";
int rc;
ENTRY;
if (!dt_try_as_dir(env, next))
goto out;
next->do_index_ops->dio_delete(env, next,
- dotdot, th, BYPASS_CAPA);
+ (const struct dt_key *)dotdot,
+ th, BYPASS_CAPA);
next->do_index_ops->dio_insert(env, next,
- (struct dt_rec *) lf_dor,
- dotdot, th, BYPASS_CAPA, 1);
+ (struct dt_rec *)lf_dor,
+ (const struct dt_key *)dotdot,
+ th, BYPASS_CAPA, 1);
out:
if (rc == 0)
if (rc == 0)
rc = mdd_lov_destroy(env, mdd, obj, la);
}
+ mdo_destroy(env, obj, th);
RETURN(rc);
}
+int orph_declare_index_delete(const struct lu_env *env,
+ struct mdd_object *obj,
+ struct thandle *th)
+{
+ struct mdd_device *mdd = mdo2mdd(&obj->mod_obj);
+ int rc;
+
+ rc = dt_declare_delete(env, mdd->mdd_orphans, NULL, th);
+ if (rc)
+ return rc;
+
+ rc = mdo_declare_ref_del(env, obj, th);
+ if (rc)
+ return rc;
+
+ if (S_ISDIR(mdd_object_type(obj))) {
+ rc = mdo_declare_ref_del(env, obj, th);
+ if (rc)
+ return rc;
+
+ rc = dt_declare_ref_del(env, mdd->mdd_orphans, th);
+ }
+
+ return rc;
+}
+
static int orph_index_delete(const struct lu_env *env,
struct mdd_object *obj,
__u32 op,
ma->ma_need = MA_INODE | MA_LOV | MA_COOKIE;
ma->ma_valid = 0;
- mdd_log_txn_param_build(env, &obj->mod_obj, ma, MDD_TXN_UNLINK_OP, 0);
- th = mdd_trans_start(env, mdd);
+ th = mdd_trans_create(env, mdd);
if (IS_ERR(th)) {
CERROR("Cannot get thandle\n");
RETURN(-ENOMEM);
}
+ rc = orph_declare_index_delete(env, obj, th);
+ if (rc)
+ GOTO(stop, rc);
+
+ rc = mdd_declare_object_kill(env, obj, ma, th);
+ if (rc)
+ GOTO(stop, rc);
+
+ rc = mdd_trans_start(env, mdd, th);
+ if (rc)
+ GOTO(stop, rc);
mdd_write_lock(env, obj, MOR_TGT_CHILD);
if (likely(obj->mod_count == 0)) {
mdd_orphan_write_unlock(env, mdd);
}
mdd_write_unlock(env, obj);
+
+stop:
mdd_trans_stop(env, mdd, 0, th);
RETURN(rc);
#include "mdd_internal.h"
-static int dto_txn_credits[DTO_NR];
-
-int mdd_txn_start_cb(const struct lu_env *env, struct txn_param *param,
- void *cookie)
-{
- struct mdd_device *mdd = cookie;
- struct obd_device *obd = mdd2obd_dev(mdd);
- /* Each transaction updates lov objids, the credits should be added for
- * this */
- int blk, shift = mdd->mdd_dt_conf.ddp_block_shift;
- blk = ((obd->u.mds.mds_lov_desc.ld_tgt_count * sizeof(obd_id) +
- (1 << shift) - 1) >> shift) + 1;
-
- /* add lov objids credits */
- param->tp_credits += blk * dto_txn_credits[DTO_WRITE_BLOCK] +
- dto_txn_credits[DTO_WRITE_BASE];
-
- return 0;
-}
-
int mdd_txn_stop_cb(const struct lu_env *env, struct thandle *txn,
void *cookie)
{
return mds_lov_write_objids(obd);
}
-void mdd_txn_param_build(const struct lu_env *env, struct mdd_device *mdd,
- enum mdd_txn_op op, int changelog_cnt)
+struct thandle *mdd_trans_create(const struct lu_env *env,
+ struct mdd_device *mdd)
{
- LASSERT(0 <= op && op < MDD_TXN_LAST_OP);
-
- txn_param_init(&mdd_env_info(env)->mti_param,
- mdd->mdd_tod[op].mod_credits);
- if (changelog_cnt > 0) {
- txn_param_credit_add(&mdd_env_info(env)->mti_param,
- changelog_cnt * dto_txn_credits[DTO_LOG_REC]);
- }
+ return mdd_child_ops(mdd)->dt_trans_create(env, mdd->mdd_child);
}
-int mdd_create_txn_param_build(const struct lu_env *env, struct mdd_device *mdd,
- struct lov_mds_md *lmm, enum mdd_txn_op op,
- int changelog_cnt)
+int mdd_trans_start(const struct lu_env *env, struct mdd_device *mdd,
+ struct thandle *th)
{
- int stripes = 0;
- ENTRY;
-
- LASSERT(op == MDD_TXN_CREATE_DATA_OP || op == MDD_TXN_MKDIR_OP);
-
- if (lmm == NULL)
- GOTO(out, 0);
- /* only replay create request will cause lov_objid update */
- if (!mdd->mdd_obd_dev->obd_recovering)
- GOTO(out, 0);
-
- /* add possible orphan unlink rec credits used in lov_objid update */
- if (le32_to_cpu(lmm->lmm_magic) == LOV_MAGIC_V1) {
- stripes = le32_to_cpu(((struct lov_mds_md_v1*)lmm)
- ->lmm_stripe_count);
- } else if (le32_to_cpu(lmm->lmm_magic) == LOV_MAGIC_V3){
- stripes = le32_to_cpu(((struct lov_mds_md_v3*)lmm)
- ->lmm_stripe_count);
- } else {
- CERROR("Unknown lmm type %X\n", le32_to_cpu(lmm->lmm_magic));
- LBUG();
- }
-out:
- mdd_txn_param_build(env, mdd, op, stripes + changelog_cnt);
- RETURN(0);
-}
-
-int mdd_log_txn_param_build(const struct lu_env *env, struct md_object *obj,
- struct md_attr *ma, enum mdd_txn_op op,
- int changelog_cnt)
-{
- struct mdd_device *mdd = mdo2mdd(&md2mdd_obj(obj)->mod_obj);
- int rc, stripe = 0;
- ENTRY;
-
- if (S_ISDIR(lu_object_attr(&obj->mo_lu)))
- GOTO(out, rc = 0);
-
- LASSERT(op == MDD_TXN_UNLINK_OP || op == MDD_TXN_RENAME_OP ||
- op == MDD_TXN_RENAME_TGT_OP);
- rc = mdd_lmm_get_locked(env, md2mdd_obj(obj), ma);
- if (rc || !(ma->ma_valid & MA_LOV))
- GOTO(out, rc);
-
- LASSERTF(le32_to_cpu(ma->ma_lmm->lmm_magic) == LOV_MAGIC_V1 ||
- le32_to_cpu(ma->ma_lmm->lmm_magic) == LOV_MAGIC_V3,
- "%08x", le32_to_cpu(ma->ma_lmm->lmm_magic));
-
- if ((int)le32_to_cpu(ma->ma_lmm->lmm_stripe_count) < 0)
- stripe = mdd2obd_dev(mdd)->u.mds.mds_lov_desc.ld_tgt_count;
- else
- stripe = le32_to_cpu(ma->ma_lmm->lmm_stripe_count);
-
-out:
- mdd_txn_param_build(env, mdd, op, stripe + changelog_cnt);
-
- RETURN(rc);
-}
-
-void mdd_setattr_txn_param_build(const struct lu_env *env,
- struct md_object *obj,
- struct md_attr *ma, enum mdd_txn_op op,
- int changelog_cnt)
-{
- struct mdd_device *mdd = mdo2mdd(&md2mdd_obj(obj)->mod_obj);
-
- mdd_txn_param_build(env, mdd, op, changelog_cnt);
- if (ma->ma_attr.la_valid & (LA_UID | LA_GID))
- txn_param_credit_add(&mdd_env_info(env)->mti_param,
- dto_txn_credits[DTO_ATTR_SET_CHOWN]);
-}
-
-static void mdd_txn_init_dto_credits(const struct lu_env *env,
- struct mdd_device *mdd, int *dto_credits)
-{
- int op, credits;
- for (op = 0; op < DTO_NR; op++) {
- credits = mdd_child_ops(mdd)->dt_credit_get(env, mdd->mdd_child,
- op);
- LASSERT(credits >= 0);
- dto_txn_credits[op] = credits;
- }
-}
-
-int mdd_txn_init_credits(const struct lu_env *env, struct mdd_device *mdd)
-{
- int op;
-
- /* Init credits for each ops. */
- mdd_txn_init_dto_credits(env, mdd, dto_txn_credits);
-
- /* Calculate the mdd credits. */
- for (op = MDD_TXN_OBJECT_DESTROY_OP; op < MDD_TXN_LAST_OP; op++) {
- int *c = &mdd->mdd_tod[op].mod_credits;
- int *dt = dto_txn_credits;
- mdd->mdd_tod[op].mod_op = op;
- switch(op) {
- case MDD_TXN_OBJECT_DESTROY_OP:
- /* Unused now */
- *c = dt[DTO_OBJECT_DELETE];
- break;
- case MDD_TXN_OBJECT_CREATE_OP:
- /* OI INSERT + CREATE OBJECT */
- *c = dt[DTO_INDEX_INSERT] +
- dt[DTO_OBJECT_CREATE];
- break;
- case MDD_TXN_ATTR_SET_OP:
- /* ATTR set + XATTR(lsm, lmv) set */
- *c = dt[DTO_ATTR_SET_BASE] +
- dt[DTO_XATTR_SET];
- break;
- case MDD_TXN_XATTR_SET_OP:
- *c = dt[DTO_XATTR_SET];
- break;
- case MDD_TXN_INDEX_INSERT_OP:
- *c = dt[DTO_INDEX_INSERT];
- break;
- case MDD_TXN_INDEX_DELETE_OP:
- *c = dt[DTO_INDEX_DELETE];
- break;
- case MDD_TXN_LINK_OP:
- *c = dt[DTO_INDEX_INSERT];
- break;
- case MDD_TXN_UNLINK_OP:
- /* delete index + Unlink log +
- * mdd orphan handling */
- *c = dt[DTO_INDEX_DELETE] +
- dt[DTO_INDEX_DELETE] +
- dt[DTO_INDEX_INSERT] * 2 +
- dt[DTO_XATTR_SET] * 3;
- break;
- case MDD_TXN_RENAME_OP:
- /* 2 delete index + 1 insert + Unlink log */
- *c = 2 * dt[DTO_INDEX_DELETE] +
- dt[DTO_INDEX_INSERT] +
- dt[DTO_INDEX_DELETE] +
- dt[DTO_INDEX_INSERT] * 2 +
- dt[DTO_XATTR_SET] * 3;
- break;
- case MDD_TXN_RENAME_TGT_OP:
- /* index insert + index delete */
- *c = dt[DTO_INDEX_DELETE] +
- dt[DTO_INDEX_INSERT] +
- dt[DTO_INDEX_DELETE] +
- dt[DTO_INDEX_INSERT] * 2 +
- dt[DTO_XATTR_SET] * 3;
- break;
- case MDD_TXN_CREATE_DATA_OP:
- /* same as set xattr(lsm) */
- *c = dt[DTO_XATTR_SET];
- break;
- case MDD_TXN_MKDIR_OP:
- /* INDEX INSERT + OI INSERT +
- * CREATE_OBJECT_CREDITS
- * SET_MD CREDITS is already counted in
- * CREATE_OBJECT CREDITS
- */
- *c = 2 * dt[DTO_INDEX_INSERT] +
- dt[DTO_OBJECT_CREATE];
- break;
- case MDD_TXN_CLOSE_OP:
- *c = 0;
- break;
- default:
- CERROR("Invalid op %d init its credit\n", op);
- LBUG();
- }
- }
- RETURN(0);
-}
-
-struct thandle* mdd_trans_start(const struct lu_env *env,
- struct mdd_device *mdd)
-{
- struct txn_param *p = &mdd_env_info(env)->mti_param;
- struct thandle *th;
-
- th = mdd_child_ops(mdd)->dt_trans_start(env, mdd->mdd_child, p);
- return th;
+ return mdd_child_ops(mdd)->dt_trans_start(env, mdd->mdd_child, th);
}
void mdd_trans_stop(const struct lu_env *env, struct mdd_device *mdd,
/*
* Copyright (c) 2007, 2010, Oracle and/or its affiliates. All rights reserved.
* Use is subject to license terms.
+ * Copyright (c) 2011 Whamcloud, Inc.
*/
/*
* Copyright (c) 2011 Whamcloud, Inc.
int i, rc;
mti = lu_context_key_get(&env->le_ctx, &mdt_thread_key);
- mdt_trans_credit_init(env, mdt, MDT_TXN_CAPA_KEYS_WRITE_OP);
- th = mdt_trans_start(env, mdt);
+ th = mdt_trans_create(env, mdt);
if (IS_ERR(th))
RETURN(PTR_ERR(th));
+ rc = dt_declare_record_write(env, mdt->mdt_ck_obj,
+ sizeof(*tmp) * 3, 0, th);
+ if (rc)
+ goto stop;
+
+ rc = mdt_trans_start(env, mdt, th);
+ if (rc)
+ goto stop;
+
tmp = &mti->mti_capa_key;
for (i = 0; i < 2; i++) {
break;
}
+stop:
mdt_trans_stop(env, mdt, th);
CDEBUG(D_INFO, "write capability keys rc = %d:\n", rc);
*(__u64 *)data->ioc_inlbuf2 = ENOENT_VERSION;
rc = -ENOENT;
} else {
- version = mo_version_get(mti->mti_env, mdt_object_child(obj));
+ version = dt_version_get(mti->mti_env, mdt_obj2dt(obj));
*(__u64 *)data->ioc_inlbuf2 = version;
rc = 0;
}
struct lr_server_data mti_lsd;
struct lsd_client_data mti_lcd;
loff_t mti_off;
- struct txn_param mti_txn_param;
struct lu_buf mti_buf;
struct lustre_capa_key mti_capa_key;
const struct md_attr *);
void mdt_reconstruct_open(struct mdt_thread_info *, struct mdt_lock_handle *);
-void mdt_trans_credit_init(const struct lu_env *env,
- struct mdt_device *mdt,
- enum mdt_txn_op op);
-struct thandle* mdt_trans_start(const struct lu_env *env,
- struct mdt_device *mdt);
+struct thandle *mdt_trans_create(const struct lu_env *env,
+ struct mdt_device *mdt);
+int mdt_trans_start(const struct lu_env *env, struct mdt_device *mdt,
+ struct thandle *th);
void mdt_trans_stop(const struct lu_env *env,
struct mdt_device *mdt, struct thandle *th);
int mdt_record_write(const struct lu_env *env,
return container_of0(d, struct mdt_device, mdt_md_dev.md_lu_dev);
}
+static inline struct dt_object *mdt_obj2dt(struct mdt_object *mo)
+{
+ struct lu_object *lo;
+ struct mdt_device *mdt = mdt_dev(mo->mot_obj.mo_lu.lo_dev);
+
+ lo = lu_object_locate(mo->mot_obj.mo_lu.lo_header,
+ mdt->mdt_bottom->dd_lu_dev.ld_type);
+ return lu2dt(lo);
+}
+
/* mdt/mdt_identity.c */
#define MDT_IDENTITY_UPCALL_PATH "/usr/sbin/l_getidentity"
return buf;
}
-static inline int mdt_trans_credit_get(const struct lu_env *env,
- struct mdt_device *mdt,
- enum mdt_txn_op op)
-{
- struct dt_device *dev = mdt->mdt_bottom;
- int cr;
- switch (op) {
- case MDT_TXN_CAPA_KEYS_WRITE_OP:
- case MDT_TXN_LAST_RCVD_WRITE_OP:
- cr = dev->dd_ops->dt_credit_get(env,
- dev,
- DTO_WRITE_BLOCK);
- break;
- default:
- LBUG();
- }
- return cr;
-}
-
-void mdt_trans_credit_init(const struct lu_env *env,
- struct mdt_device *mdt,
- enum mdt_txn_op op)
+struct thandle *mdt_trans_create(const struct lu_env *env,
+ struct mdt_device *mdt)
{
- struct mdt_thread_info *mti;
- struct txn_param *p;
- int cr;
-
- mti = lu_context_key_get(&env->le_ctx, &mdt_thread_key);
- p = &mti->mti_txn_param;
-
- cr = mdt_trans_credit_get(env, mdt, op);
- txn_param_init(p, cr);
+ return mdt->mdt_bottom->dd_ops->dt_trans_create(env, mdt->mdt_bottom);
}
-struct thandle* mdt_trans_start(const struct lu_env *env,
- struct mdt_device *mdt)
+int mdt_trans_start(const struct lu_env *env, struct mdt_device *mdt,
+ struct thandle *th)
{
- struct mdt_thread_info *mti;
- struct txn_param *p;
-
- mti = lu_context_key_get(&env->le_ctx, &mdt_thread_key);
- p = &mti->mti_txn_param;
-
- return dt_trans_start(env, mdt->mdt_bottom, p);
+ return dt_trans_start(env, mdt->mdt_bottom, th);
}
void mdt_trans_stop(const struct lu_env *env,
return rc;
}
+static int mdt_declare_last_rcvd_header_write(const struct lu_env *env,
+ struct mdt_device *mdt,
+ struct thandle *th)
+{
+ struct mdt_thread_info *mti;
+
+ mti = lu_context_key_get(&env->le_ctx, &mdt_thread_key);
+
+ return dt_declare_record_write(env, mdt->mdt_lut.lut_last_rcvd,
+ sizeof(mti->mti_lsd), 0, th);
+}
+
static int mdt_last_rcvd_header_write(const struct lu_env *env,
struct mdt_device *mdt,
struct thandle *th)
return rc;
}
+static int mdt_declare_last_rcvd_write(const struct lu_env *env,
+ struct mdt_device *mdt,
+ loff_t off, struct thandle *th)
+{
+ return dt_declare_record_write(env, mdt->mdt_lut.lut_last_rcvd,
+ sizeof(struct lsd_client_data), off, th);
+}
+
static int mdt_last_rcvd_write(const struct lu_env *env,
struct mdt_device *mdt,
struct lsd_client_data *lcd,
mti = lu_context_key_get(&env->le_ctx, &mdt_thread_key);
- mdt_trans_credit_init(env, mdt, MDT_TXN_LAST_RCVD_WRITE_OP);
- th = mdt_trans_start(env, mdt);
+ th = mdt_trans_create(env, mdt);
if (IS_ERR(th))
RETURN(PTR_ERR(th));
+ rc = mdt_declare_last_rcvd_header_write(env, mdt, th);
+ if (rc)
+ goto out;
+
+ rc = mdt_trans_start(env, mdt, th);
+ if (rc)
+ goto out;
+
CDEBUG(D_SUPER, "MDS mount_count is "LPU64", last_transno is "LPU64"\n",
mdt->mdt_lut.lut_obd->u.obt.obt_mount_count,
mdt->mdt_lut.lut_last_transno);
cfs_spin_unlock(&mdt->mdt_lut.lut_translock);
rc = mdt_last_rcvd_header_write(env, mdt, th);
+
+out:
mdt_trans_stop(env, mdt, th);
return rc;
}
if (OBD_FAIL_CHECK(OBD_FAIL_TGT_CLIENT_ADD))
RETURN(-ENOSPC);
- mdt_trans_credit_init(env, mdt, MDT_TXN_LAST_RCVD_WRITE_OP);
-
- th = mdt_trans_start(env, mdt);
+ th = mdt_trans_create(env, mdt);
if (IS_ERR(th))
RETURN(PTR_ERR(th));
+ rc = mdt_declare_last_rcvd_write(env, mdt, off, th);
+ if (rc)
+ GOTO(stop, rc);
+
+ rc = mdt_trans_start(env, mdt, th);
+ if (rc)
+ GOTO(stop, rc);
+
/*
* Until this operations will be committed the sync is needed
* for this export. This should be done _after_ starting the
rc = mdt_last_rcvd_write(env, mdt, ted->ted_lcd, &off, th);
CDEBUG(D_INFO, "wrote client lcd at idx %u off %llu (len %u)\n",
cl_idx, ted->ted_lr_off, (int)sizeof(*(ted->ted_lcd)));
+
+stop:
mdt_trans_stop(env, mdt, th);
RETURN(rc);
* be in server data or in client data in case of failure */
mdt_server_data_update(env, mdt);
- mdt_trans_credit_init(env, mdt, MDT_TXN_LAST_RCVD_WRITE_OP);
- th = mdt_trans_start(env, mdt);
+ th = mdt_trans_create(env, mdt);
if (IS_ERR(th))
GOTO(free, rc = PTR_ERR(th));
+ rc = mdt_declare_last_rcvd_write(env, mdt, off, th);
+ if (rc)
+ GOTO(stop, rc);
+
+ rc = mdt_trans_start(env, mdt, th);
+ if (rc)
+ GOTO(stop, rc);
+
cfs_mutex_down(&ted->ted_lcd_lock);
memset(ted->ted_lcd->lcd_uuid, 0, sizeof ted->ted_lcd->lcd_uuid);
rc = mdt_last_rcvd_write(env, mdt, ted->ted_lcd, &off, th);
cfs_mutex_up(&ted->ted_lcd_lock);
+
+stop:
mdt_trans_stop(env, mdt, th);
CDEBUG(rc == 0 ? D_INFO : D_ERROR, "Zeroing out client idx %u in "
/* add credits for last_rcvd update */
static int mdt_txn_start_cb(const struct lu_env *env,
- struct txn_param *param, void *cookie)
+ struct thandle *th, void *cookie)
{
struct mdt_device *mdt = cookie;
- param->tp_credits += mdt_trans_credit_get(env, mdt,
- MDT_TXN_LAST_RCVD_WRITE_OP);
- return 0;
-}
-
-/* Set new object versions */
-static void mdt_version_set(struct mdt_thread_info *info)
-{
- if (info->mti_mos != NULL) {
- mo_version_set(info->mti_env, mdt_object_child(info->mti_mos),
- info->mti_transno);
- info->mti_mos = NULL;
- }
+ /* XXX: later we'll be declaring this at specific offset */
+ return dt_declare_record_write(env, mdt->mdt_lut.lut_last_rcvd,
+ sizeof(struct lsd_client_data), 0, th);
}
/* Update last_rcvd records with latests transaction data */
LASSERT(req != NULL && req->rq_repmsg != NULL);
/** VBR: set new versions */
- if (txn->th_result == 0)
- mdt_version_set(mti);
+ if (txn->th_result == 0 && mti->mti_mos != NULL) {
+ dt_version_set(env, mdt_obj2dt(mti->mti_mos),
+ mti->mti_transno, txn);
+ mti->mti_mos = NULL;
+ }
/* filling reply data */
CDEBUG(D_INODE, "transno = "LPU64", last_committed = "LPU64"\n",
LASSERT(o);
LASSERT(mdt_object_exists(o) >= 0);
if (mdt_object_exists(o) > 0)
- *version = mo_version_get(info->mti_env, mdt_object_child(o));
+ *version = dt_version_get(info->mti_env, mdt_obj2dt(o));
else
*version = ENOENT_VERSION;
CDEBUG(D_INODE, "FID "DFID" version is "LPX64"\n",
/*
* Copyright (c) 2007, 2010, Oracle and/or its affiliates. All rights reserved.
* Use is subject to license terms.
+ * Copyright (c) 2011 Whamcloud, Inc.
*/
/*
* This file is part of Lustre, http://www.lustre.org/
EXPORT_SYMBOL(dt_txn_callback_del);
int dt_txn_hook_start(const struct lu_env *env,
- struct dt_device *dev, struct txn_param *param)
+ struct dt_device *dev, struct thandle *th)
{
- int result;
+ int rc = 0;
struct dt_txn_callback *cb;
- result = 0;
+ if (th->th_local)
+ return 0;
+
cfs_list_for_each_entry(cb, &dev->dd_txn_callbacks, dtc_linkage) {
if (cb->dtc_txn_start == NULL ||
!(cb->dtc_tag & env->le_ctx.lc_tags))
continue;
- result = cb->dtc_txn_start(env, param, cb->dtc_cookie);
- if (result < 0)
+ rc = cb->dtc_txn_start(env, th, cb->dtc_cookie);
+ if (rc < 0)
break;
}
- return result;
+ return rc;
}
EXPORT_SYMBOL(dt_txn_hook_start);
{
struct dt_device *dev = txn->th_dev;
struct dt_txn_callback *cb;
- int result;
+ int rc = 0;
+
+ if (txn->th_local)
+ return 0;
- result = 0;
cfs_list_for_each_entry(cb, &dev->dd_txn_callbacks, dtc_linkage) {
if (cb->dtc_txn_stop == NULL ||
!(cb->dtc_tag & env->le_ctx.lc_tags))
continue;
- result = cb->dtc_txn_stop(env, txn, cb->dtc_cookie);
- if (result < 0)
+ rc = cb->dtc_txn_stop(env, txn, cb->dtc_cookie);
+ if (rc < 0)
break;
}
- return result;
+ return rc;
}
EXPORT_SYMBOL(dt_txn_hook_stop);
{
struct dt_txn_callback *cb;
+ if (txn->th_local)
+ return;
+
cfs_list_for_each_entry(cb, &txn->th_dev->dd_txn_callbacks,
dtc_linkage) {
if (cb->dtc_txn_commit)
}
return result;
}
-
EXPORT_SYMBOL(dt_mode_to_dft);
+
/**
* lookup fid for object named \a name in directory \a dir.
*/
-static int dt_lookup(const struct lu_env *env, struct dt_object *dir,
- const char *name, struct lu_fid *fid)
+int dt_lookup_dir(const struct lu_env *env, struct dt_object *dir,
+ const char *name, struct lu_fid *fid)
{
- struct dt_rec *rec = (struct dt_rec *)fid;
- const struct dt_key *key = (const struct dt_key *)name;
- int result;
-
- if (dt_try_as_dir(env, dir)) {
- result = dir->do_index_ops->dio_lookup(env, dir, rec, key,
- BYPASS_CAPA);
- if (result > 0)
- result = 0;
- else if (result == 0)
- result = -ENOENT;
- } else
- result = -ENOTDIR;
- return result;
+ if (dt_try_as_dir(env, dir))
+ return dt_lookup(env, dir, (struct dt_rec *)fid,
+ (const struct dt_key *)name, BYPASS_CAPA);
+ return -ENOTDIR;
}
-
+EXPORT_SYMBOL(dt_lookup_dir);
/**
* get object for given \a fid.
*/
struct dt_object *obj = dfh->dfh_o;
int result;
- result = dt_lookup(env, obj, entry, fid);
+ result = dt_lookup_dir(env, obj, entry, fid);
lu_object_put(env, &obj->do_lu);
if (result == 0) {
obj = dt_locate(env, dt, fid);
struct dt_object *o;
int result;
- result = dt_lookup(env, p, name, fid);
+ result = dt_lookup_dir(env, p, name, fid);
if (result == 0){
o = dt_locate(env, dt, fid);
}
LASSERTF(dt != NULL, "dt is NULL when we want to write record\n");
LASSERT(th != NULL);
+ LASSERT(dt->do_body_ops);
+ LASSERT(dt->do_body_ops->dbo_write);
rc = dt->do_body_ops->dbo_write(env, dt, buf, pos, th, BYPASS_CAPA, 1);
if (rc == buf->lb_len)
rc = 0;
}
EXPORT_SYMBOL(dt_record_write);
+int dt_declare_version_set(const struct lu_env *env, struct dt_object *o,
+ struct thandle *th)
+{
+ struct lu_buf vbuf;
+ char *xname = XATTR_NAME_VERSION;
+
+ LASSERT(o);
+ vbuf.lb_buf = NULL;
+ vbuf.lb_len = sizeof(dt_obj_version_t);
+ return dt_declare_xattr_set(env, o, &vbuf, xname, 0, th);
+
+}
+EXPORT_SYMBOL(dt_declare_version_set);
+
+void dt_version_set(const struct lu_env *env, struct dt_object *o,
+ dt_obj_version_t version, struct thandle *th)
+{
+ struct lu_buf vbuf;
+ char *xname = XATTR_NAME_VERSION;
+ int rc;
+
+ LASSERT(o);
+ vbuf.lb_buf = &version;
+ vbuf.lb_len = sizeof(version);
+
+ rc = dt_xattr_set(env, o, &vbuf, xname, 0, th, BYPASS_CAPA);
+ if (rc != 0)
+ CDEBUG(D_INODE, "Can't set version, rc %d\n", rc);
+ return;
+}
+EXPORT_SYMBOL(dt_version_set);
+
+dt_obj_version_t dt_version_get(const struct lu_env *env, struct dt_object *o)
+{
+ struct lu_buf vbuf;
+ char *xname = XATTR_NAME_VERSION;
+ dt_obj_version_t version;
+ int rc;
+
+ LASSERT(o);
+ vbuf.lb_buf = &version;
+ vbuf.lb_len = sizeof(version);
+ rc = dt_xattr_get(env, o, &vbuf, xname, BYPASS_CAPA);
+ if (rc != sizeof(version)) {
+ CDEBUG(D_INODE, "Can't get version, rc %d\n", rc);
+ version = 0;
+ }
+ return version;
+}
+EXPORT_SYMBOL(dt_version_get);
+
const struct dt_index_features dt_directory_features;
EXPORT_SYMBOL(dt_directory_features);
static const struct dt_object_operations osd_obj_ops;
static const struct dt_object_operations osd_obj_ea_ops;
static const struct dt_body_operations osd_body_ops;
+static const struct dt_body_operations osd_body_ops_new;
static const struct dt_index_operations osd_index_iam_ops;
static const struct dt_index_operations osd_index_ea_ops;
+#define OSD_TRACK_DECLARES
+#ifdef OSD_TRACK_DECLARES
+#define OSD_DECLARE_OP(oh, op) { \
+ LASSERT(oh->ot_handle == NULL); \
+ ((oh)->ot_declare_ ##op)++; }
+#define OSD_EXEC_OP(handle, op) { \
+ struct osd_thandle *oh; \
+ oh = container_of0(handle, struct osd_thandle, ot_super);\
+ LASSERT((oh)->ot_declare_ ##op > 0); \
+ ((oh)->ot_declare_ ##op)--; }
+#else
+#define OSD_DECLARE_OP(oh, op)
+#define OSD_EXEC_OP(oh, op)
+#endif
+
struct osd_thandle {
struct thandle ot_super;
handle_t *ot_handle;
cfs_list_t ot_dcb_list;
/* Link to the device, for debugging. */
struct lu_ref_link *ot_dev_link;
+ int ot_credits;
+
+#ifdef OSD_TRACK_DECLARES
+ unsigned char ot_declare_attr_set;
+ unsigned char ot_declare_punch;
+ unsigned char ot_declare_xattr_set;
+ unsigned char ot_declare_create;
+ unsigned char ot_declare_destroy;
+ unsigned char ot_declare_ref_add;
+ unsigned char ot_declare_ref_del;
+ unsigned char ot_declare_write;
+ unsigned char ot_declare_insert;
+ unsigned char ot_declare_delete;
+#endif
#if OSD_THANDLE_STATS
/** time when this handle was allocated */
#endif
};
+/**
+ * Basic transaction credit op
+ */
+enum dt_txn_op {
+ DTO_INDEX_INSERT,
+ DTO_INDEX_DELETE,
+ DTO_INDEX_UPDATE,
+ DTO_OBJECT_CREATE,
+ DTO_OBJECT_DELETE,
+ DTO_ATTR_SET_BASE,
+ DTO_XATTR_SET,
+ DTO_LOG_REC, /**< XXX temporary: dt layer knows nothing about llog. */
+ DTO_WRITE_BASE,
+ DTO_WRITE_BLOCK,
+ DTO_ATTR_SET_CHOWN,
+
+ DTO_NR
+};
+
/*
* Helpers.
*/
LINVRNT(osd_invariant(obj));
result = osd_fid_lookup(env, obj, lu_object_fid(l));
+ obj->oo_dt.do_body_ops = &osd_body_ops_new;
if (result == 0) {
if (obj->oo_inode != NULL)
osd_object_init0(obj);
* Concurrency: doesn't access mutable data.
*/
static int osd_param_is_sane(const struct osd_device *dev,
- const struct txn_param *param)
+ const struct thandle *th)
{
- return param->tp_credits <= osd_journal(dev)->j_max_transaction_buffers;
+ struct osd_thandle *oh;
+ oh = container_of0(th, struct osd_thandle, ot_super);
+ return oh->ot_credits <= osd_journal(dev)->j_max_transaction_buffers;
}
/*
OBD_FREE_PTR(oh);
}
+static struct thandle *osd_trans_create(const struct lu_env *env,
+ struct dt_device *d)
+{
+ struct osd_thread_info *oti = osd_oti_get(env);
+ struct osd_thandle *oh;
+ struct thandle *th;
+ ENTRY;
+
+ th = ERR_PTR(-ENOMEM);
+ OBD_ALLOC_GFP(oh, sizeof *oh, CFS_ALLOC_IO);
+ if (oh != NULL) {
+ th = &oh->ot_super;
+ th->th_dev = d;
+ th->th_result = 0;
+ th->th_tags = LCT_TX_HANDLE;
+ oh->ot_credits = 0;
+ oti->oti_dev = osd_dt_dev(d);
+ CFS_INIT_LIST_HEAD(&oh->ot_dcb_list);
+ osd_th_alloced(oh);
+ }
+ RETURN(th);
+}
+
/*
* Concurrency: shouldn't matter.
*/
-static struct thandle *osd_trans_start(const struct lu_env *env,
- struct dt_device *d,
- struct txn_param *p)
+int osd_trans_start(const struct lu_env *env, struct dt_device *d,
+ struct thandle *th)
{
+ struct osd_thread_info *oti = osd_oti_get(env);
struct osd_device *dev = osd_dt_dev(d);
handle_t *jh;
struct osd_thandle *oh;
- struct thandle *th;
- int hook_res;
+ int rc;
ENTRY;
- hook_res = dt_txn_hook_start(env, d, p);
- if (hook_res != 0)
- RETURN(ERR_PTR(hook_res));
+ LASSERT(current->journal_info == NULL);
- if (osd_param_is_sane(dev, p)) {
- OBD_ALLOC_GFP(oh, sizeof *oh, CFS_ALLOC_IO);
- if (oh != NULL) {
- struct osd_thread_info *oti = osd_oti_get(env);
+ oh = container_of0(th, struct osd_thandle, ot_super);
+ LASSERT(oh != NULL);
+ LASSERT(oh->ot_handle == NULL);
- /*
- * XXX temporary stuff. Some abstraction layer should
- * be used.
- */
- oti->oti_dev = dev;
- CFS_INIT_LIST_HEAD(&oh->ot_dcb_list);
- osd_th_alloced(oh);
- jh = ldiskfs_journal_start_sb(osd_sb(dev), p->tp_credits);
- osd_th_started(oh);
- if (!IS_ERR(jh)) {
- oh->ot_handle = jh;
- th = &oh->ot_super;
- th->th_dev = d;
- th->th_result = 0;
- th->th_sync = 0;
- lu_device_get(&d->dd_lu_dev);
- oh->ot_dev_link = lu_ref_add
- (&d->dd_lu_dev.ld_reference,
- "osd-tx", th);
- /* add commit callback */
- lu_context_init(&th->th_ctx, LCT_TX_HANDLE);
- lu_context_enter(&th->th_ctx);
- LASSERT(oti->oti_txns == 0);
- LASSERT(oti->oti_r_locks == 0);
- LASSERT(oti->oti_w_locks == 0);
- oti->oti_txns++;
- } else {
- OBD_FREE_PTR(oh);
- th = (void *)jh;
- }
- } else
- th = ERR_PTR(-ENOMEM);
- } else {
- CERROR("Invalid transaction parameters\n");
- th = ERR_PTR(-EINVAL);
+ rc = dt_txn_hook_start(env, d, th);
+ if (rc != 0)
+ GOTO(out, rc);
+
+ oh->ot_credits += LDISKFS_QUOTA_INIT_BLOCKS(osd_sb(dev));
+
+ if (!osd_param_is_sane(dev, th)) {
+ CWARN("%s: too many transaction credits (%d > %d)\n",
+ d->dd_lu_dev.ld_obd->obd_name, oh->ot_credits,
+ osd_journal(dev)->j_max_transaction_buffers);
+#ifdef OSD_TRACK_DECLARES
+ CERROR(" attr_set: %d, punch: %d, xattr_set: %d,\n",
+ oh->ot_declare_attr_set, oh->ot_declare_punch,
+ oh->ot_declare_xattr_set);
+ CERROR(" create: %d, ref_add: %d, ref_del: %d, write: %d\n",
+ oh->ot_declare_create, oh->ot_declare_ref_add,
+ oh->ot_declare_ref_del, oh->ot_declare_write);
+ CERROR(" insert: %d, delete: %d\n",
+ oh->ot_declare_insert, oh->ot_declare_delete);
+#endif
}
- RETURN(th);
+ /*
+ * XXX temporary stuff. Some abstraction layer should
+ * be used.
+ */
+ jh = ldiskfs_journal_start_sb(osd_sb(dev), oh->ot_credits);
+ osd_th_started(oh);
+ if (!IS_ERR(jh)) {
+ oh->ot_handle = jh;
+ LASSERT(oti->oti_txns == 0);
+ lu_context_init(&th->th_ctx, th->th_tags);
+ lu_context_enter(&th->th_ctx);
+
+ lu_device_get(&d->dd_lu_dev);
+ oh->ot_dev_link = lu_ref_add(&d->dd_lu_dev.ld_reference,
+ "osd-tx", th);
+
+ /*
+ * XXX: current rule is that we first start tx,
+ * then lock object(s), but we can't use
+ * this rule for data (due to locking specifics
+ * in ldiskfs). also in long-term we'd like to
+ * use usually-used (locks;tx) ordering. so,
+ * UGLY thing is that we'll use one ordering for
+ * data (ofd) and reverse ordering for metadata
+ * (mdd). then at some point we'll fix the latter
+ */
+ if (lu_device_is_md(&d->dd_lu_dev)) {
+ LASSERT(oti->oti_r_locks == 0);
+ LASSERT(oti->oti_w_locks == 0);
+ }
+
+ oti->oti_txns++;
+ rc = 0;
+ } else {
+ rc = PTR_ERR(jh);
+ }
+out:
+ RETURN(rc);
}
/*
* Concurrency: shouldn't matter.
*/
-static void osd_trans_stop(const struct lu_env *env, struct thandle *th)
+static int osd_trans_stop(const struct lu_env *env, struct thandle *th)
{
- int result;
- struct osd_thandle *oh;
+ int rc = 0;
+ struct osd_thandle *oh;
struct osd_thread_info *oti = osd_oti_get(env);
ENTRY;
oh = container_of0(th, struct osd_thandle, ot_super);
+
if (oh->ot_handle != NULL) {
handle_t *hdl = oh->ot_handle;
hdl->h_sync = th->th_sync;
+
/*
* add commit callback
* notice we don't do this in osd_trans_start()
LASSERT(oti->oti_txns == 1);
oti->oti_txns--;
- LASSERT(oti->oti_r_locks == 0);
- LASSERT(oti->oti_w_locks == 0);
- result = dt_txn_hook_stop(env, th);
- if (result != 0)
- CERROR("Failure in transaction hook: %d\n", result);
+ /*
+ * XXX: current rule is that we first start tx,
+ * then lock object(s), but we can't use
+ * this rule for data (due to locking specifics
+ * in ldiskfs). also in long-term we'd like to
+ * use usually-used (locks;tx) ordering. so,
+ * UGLY thing is that we'll use one ordering for
+ * data (ofd) and reverse ordering for metadata
+ * (mdd). then at some point we'll fix the latter
+ */
+ if (lu_device_is_md(&th->th_dev->dd_lu_dev)) {
+ LASSERT(oti->oti_r_locks == 0);
+ LASSERT(oti->oti_w_locks == 0);
+ }
+ rc = dt_txn_hook_stop(env, th);
+ if (rc != 0)
+ CERROR("Failure in transaction hook: %d\n", rc);
oh->ot_handle = NULL;
OSD_CHECK_SLOW_TH(oh, oti->oti_dev,
- result = ldiskfs_journal_stop(hdl));
- if (result != 0)
- CERROR("Failure to stop transaction: %d\n", result);
+ rc = ldiskfs_journal_stop(hdl));
+ if (rc != 0)
+ CERROR("Failure to stop transaction: %d\n", rc);
} else {
OBD_FREE_PTR(oh);
}
- EXIT;
+
+ RETURN(rc);
}
static int osd_trans_cb_add(struct thandle *th, struct dt_txn_commit_cb *dcb)
}
/*
- * Concurrency: no concurrent access is possible that late in object
- * life-cycle.
- */
-static int osd_inode_remove(const struct lu_env *env, struct osd_object *obj)
-{
- const struct lu_fid *fid = lu_object_fid(&obj->oo_dt.do_lu);
- struct osd_device *osd = osd_obj2dev(obj);
- struct osd_thread_info *oti = osd_oti_get(env);
- struct txn_param *prm = &oti->oti_txn;
- struct lu_env *env_del_obj = &oti->oti_obj_delete_tx_env;
- struct thandle *th;
- int result;
-
- lu_env_init(env_del_obj, LCT_DT_THREAD);
- txn_param_init(prm, OSD_TXN_OI_DELETE_CREDITS +
- OSD_TXN_INODE_DELETE_CREDITS);
- th = osd_trans_start(env_del_obj, &osd->od_dt_dev, prm);
- if (!IS_ERR(th)) {
- result = osd_oi_delete(osd_oti_get(env_del_obj),
- &osd->od_oi, fid, th);
- osd_trans_stop(env_del_obj, th);
- } else
- result = PTR_ERR(th);
-
- lu_env_fini(env_del_obj);
- return result;
-}
-
-/*
* Called just before object is freed. Releases all resources except for
* object itself (that is released by osd_object_free()).
*
osd_index_fini(obj);
if (inode != NULL) {
- int result;
-
- if (osd_inode_unlinked(inode)) {
- result = osd_inode_remove(env, obj);
- if (result != 0)
- LU_OBJECT_DEBUG(D_ERROR, env, l,
- "Failed to cleanup: %d\n",
- result);
- }
-
iput(inode);
obj->oo_inode = NULL;
}
static void osd_object_release(const struct lu_env *env,
struct lu_object *l)
{
- struct osd_object *o = osd_obj(l);
-
- LASSERT(!lu_object_is_dying(l->lo_header));
- if (o->oo_inode != NULL && osd_inode_unlinked(o->oo_inode))
- cfs_set_bit(LU_OBJECT_HEARD_BANSHEE, &l->lo_header->loh_flags);
}
/*
/*
* Concurrency: shouldn't matter.
*/
-lvfs_sbdev_type fsfilt_ldiskfs_journal_sbdev(struct super_block *);
static void osd_ro(const struct lu_env *env, struct dt_device *d)
{
+ struct super_block *sb = osd_sb(osd_dt_dev(d));
ENTRY;
CERROR("*** setting device %s read-only ***\n", LUSTRE_OSD_NAME);
- __lvfs_set_rdonly(lvfs_sbdev(osd_sb(osd_dt_dev(d))),
- fsfilt_ldiskfs_journal_sbdev(osd_sb(osd_dt_dev(d))));
+ __lvfs_set_rdonly(sb->s_bdev, LDISKFS_SB(sb)->journal_bdev);
EXIT;
}
-
/*
* Concurrency: serialization provided by callers.
*/
/**
* Unused now
*/
- [DTO_IDNEX_UPDATE] = 16,
+ [DTO_INDEX_UPDATE] = 16,
/**
* Create a object. The same as create object in EXT3.
* DATA_TRANS_BLOCKS(14) +
*/
[DTO_OBJECT_CREATE] = 25,
/**
- * Unused now
+ * XXX: real credits to be fixed
*/
[DTO_OBJECT_DELETE] = 25,
/**
- * Attr set credits.
- * 3(inode bits, group, GDT)
+ * Attr set credits (inode)
*/
- [DTO_ATTR_SET_BASE] = 3,
+ [DTO_ATTR_SET_BASE] = 1,
/**
* Xattr set. The same as xattr of EXT3.
* DATA_TRANS_BLOCKS(14)
[DTO_XATTR_SET] = 14,
[DTO_LOG_REC] = 14,
/**
- * creadits for inode change during write.
+ * credits for inode change during write.
*/
[DTO_WRITE_BASE] = 3,
/**
/**
* Unused now.
*/
- [DTO_IDNEX_UPDATE] = 16,
+ [DTO_INDEX_UPDATE] = 16,
/*
* Create a object. Same as create object in EXT3 filesystem.
* DATA_TRANS_BLOCKS(16) +
[DTO_ATTR_SET_CHOWN]= 68,
};
-static int osd_credit_get(const struct lu_env *env, struct dt_device *d,
- enum dt_txn_op op)
-{
- LASSERT(ARRAY_SIZE(osd_dto_credits_noquota) ==
- ARRAY_SIZE(osd_dto_credits_quota));
- LASSERT(0 <= op && op < ARRAY_SIZE(osd_dto_credits_noquota));
-#ifdef HAVE_QUOTA_SUPPORT
- if (test_opt(osd_sb(osd_dt_dev(d)), QUOTA))
- return osd_dto_credits_quota[op];
- else
-#endif
- return osd_dto_credits_noquota[op];
-}
-
static const struct dt_device_operations osd_dt_ops = {
.dt_root_get = osd_root_get,
.dt_statfs = osd_statfs,
+ .dt_trans_create = osd_trans_create,
.dt_trans_start = osd_trans_start,
.dt_trans_stop = osd_trans_stop,
.dt_trans_cb_add = osd_trans_cb_add,
.dt_sync = osd_sync,
.dt_ro = osd_ro,
.dt_commit_async = osd_commit_async,
- .dt_credit_get = osd_credit_get,
.dt_init_capa_ctxt = osd_init_capa_ctxt,
.dt_init_quota_ctxt= osd_init_quota_ctxt,
};
return 0;
}
+static int osd_declare_attr_set(const struct lu_env *env,
+ struct dt_object *dt,
+ const struct lu_attr *attr,
+ struct thandle *handle)
+{
+ struct osd_thandle *oh;
+
+ LASSERT(handle != NULL);
+ LASSERT(osd_invariant(obj));
+
+ oh = container_of0(handle, struct osd_thandle, ot_super);
+ LASSERT(oh->ot_handle == NULL);
+
+ OSD_DECLARE_OP(oh, attr_set);
+ oh->ot_credits += osd_dto_credits_noquota[DTO_ATTR_SET_BASE];
+
+ return 0;
+}
+
static int osd_inode_setattr(const struct lu_env *env,
struct inode *inode, const struct lu_attr *attr)
{
if (osd_object_auth(env, dt, capa, CAPA_OPC_META_WRITE))
return -EACCES;
+ OSD_EXEC_OP(handle, attr_set);
+
cfs_spin_lock(&obj->oo_guard);
rc = osd_inode_setattr(env, obj->oo_inode, attr);
cfs_spin_unlock(&obj->oo_guard);
uc->mu_cap & CFS_CAP_SYS_RESOURCE_MASK);
}
+static int osd_declare_object_create(const struct lu_env *env,
+ struct dt_object *dt,
+ struct lu_attr *attr,
+ struct dt_allocation_hint *hint,
+ struct dt_object_format *dof,
+ struct thandle *handle)
+{
+ struct osd_thandle *oh;
+
+ LASSERT(handle != NULL);
+
+ oh = container_of0(handle, struct osd_thandle, ot_super);
+ LASSERT(oh->ot_handle == NULL);
+
+ OSD_DECLARE_OP(oh, insert);
+ OSD_DECLARE_OP(oh, create);
+ oh->ot_credits += osd_dto_credits_noquota[DTO_OBJECT_CREATE];
+ oh->ot_credits += osd_dto_credits_noquota[DTO_INDEX_INSERT];
+
+ /* if this is directory, then we expect . and ..
+ * to be inserted as well */
+ OSD_DECLARE_OP(oh, insert);
+ OSD_DECLARE_OP(oh, insert);
+ oh->ot_credits += osd_dto_credits_noquota[DTO_INDEX_INSERT];
+ return 0;
+}
+
static int osd_object_create(const struct lu_env *env, struct dt_object *dt,
struct lu_attr *attr,
struct dt_allocation_hint *hint,
LASSERT(osd_write_locked(env, obj));
LASSERT(th != NULL);
+ OSD_EXEC_OP(th, create);
+
result = __osd_object_create(info, obj, attr, hint, dof, th);
if (result == 0)
result = __osd_oi_insert(env, obj, fid, th);
}
/**
+ * Called to destroy on-disk representation of the object
+ *
+ * Concurrency: must be locked
+ */
+static int osd_declare_object_destroy(const struct lu_env *env,
+ struct dt_object *dt,
+ struct thandle *th)
+{
+ struct osd_object *obj = osd_dt_obj(dt);
+ struct inode *inode = obj->oo_inode;
+ struct osd_thandle *oh;
+ ENTRY;
+
+ oh = container_of0(th, struct osd_thandle, ot_super);
+ LASSERT(oh->ot_handle == NULL);
+ LASSERT(inode);
+ LASSERT(!lu_object_is_dying(dt->do_lu.lo_header));
+
+ OSD_DECLARE_OP(oh, destroy);
+ OSD_DECLARE_OP(oh, delete);
+ oh->ot_credits += osd_dto_credits_noquota[DTO_OBJECT_DELETE];
+ oh->ot_credits += osd_dto_credits_noquota[DTO_INDEX_DELETE];
+
+ RETURN(0);
+}
+
+static int osd_object_destroy(const struct lu_env *env,
+ struct dt_object *dt,
+ struct thandle *th)
+{
+ const struct lu_fid *fid = lu_object_fid(&dt->do_lu);
+ struct osd_object *obj = osd_dt_obj(dt);
+ struct inode *inode = obj->oo_inode;
+ struct osd_device *osd = osd_obj2dev(obj);
+ struct osd_thandle *oh;
+ int result;
+ ENTRY;
+
+ oh = container_of0(th, struct osd_thandle, ot_super);
+ LASSERT(oh->ot_handle);
+ LASSERT(inode);
+ LASSERT(osd_inode_unlinked(inode));
+
+ OSD_EXEC_OP(th, destroy);
+
+ result = osd_oi_delete(osd_oti_get(env), &osd->od_oi, fid, th);
+
+ /* XXX: add to ext3 orphan list */
+ /* rc = ext3_orphan_add(handle_t *handle, struct inode *inode) */
+
+ /* not needed in the cache anymore */
+ set_bit(LU_OBJECT_HEARD_BANSHEE, &dt->do_lu.lo_header->loh_flags);
+
+ RETURN(0);
+}
+
+/**
* Helper function for osd_xattr_set()
*/
static int __osd_xattr_set(const struct lu_env *env, struct dt_object *dt,
LASSERT(osd_write_locked(env, obj));
LASSERT(th != NULL);
+ OSD_EXEC_OP(th, create);
+
result = __osd_object_create(info, obj, attr, hint, dof, th);
/* objects under osd root shld have igif fid, so dont add fid EA */
RETURN(result);
}
+static int osd_declare_object_ref_add(const struct lu_env *env,
+ struct dt_object *dt,
+ struct thandle *handle)
+{
+ struct osd_thandle *oh;
+
+ /* it's possible that object doesn't exist yet */
+ LASSERT(handle != NULL);
+
+ oh = container_of0(handle, struct osd_thandle, ot_super);
+ LASSERT(oh->ot_handle == NULL);
+
+ OSD_DECLARE_OP(oh, ref_add);
+ oh->ot_credits += osd_dto_credits_noquota[DTO_ATTR_SET_BASE];
+
+ return 0;
+}
+
/*
* Concurrency: @dt is write locked.
*/
-static void osd_object_ref_add(const struct lu_env *env,
- struct dt_object *dt,
- struct thandle *th)
+static int osd_object_ref_add(const struct lu_env *env,
+ struct dt_object *dt,
+ struct thandle *th)
{
struct osd_object *obj = osd_dt_obj(dt);
struct inode *inode = obj->oo_inode;
LASSERT(osd_write_locked(env, obj));
LASSERT(th != NULL);
+ OSD_EXEC_OP(th, ref_add);
+
cfs_spin_lock(&obj->oo_guard);
LASSERT(inode->i_nlink < LDISKFS_LINK_MAX);
inode->i_nlink++;
cfs_spin_unlock(&obj->oo_guard);
inode->i_sb->s_op->dirty_inode(inode);
LINVRNT(osd_invariant(obj));
+
+ return 0;
+}
+
+static int osd_declare_object_ref_del(const struct lu_env *env,
+ struct dt_object *dt,
+ struct thandle *handle)
+{
+ struct osd_thandle *oh;
+
+ LASSERT(dt_object_exists(dt));
+ LASSERT(handle != NULL);
+
+ oh = container_of0(handle, struct osd_thandle, ot_super);
+ LASSERT(oh->ot_handle == NULL);
+
+ OSD_DECLARE_OP(oh, ref_del);
+ oh->ot_credits += osd_dto_credits_noquota[DTO_ATTR_SET_BASE];
+
+ return 0;
}
/*
* Concurrency: @dt is write locked.
*/
-static void osd_object_ref_del(const struct lu_env *env,
- struct dt_object *dt,
- struct thandle *th)
+static int osd_object_ref_del(const struct lu_env *env,
+ struct dt_object *dt,
+ struct thandle *th)
{
struct osd_object *obj = osd_dt_obj(dt);
struct inode *inode = obj->oo_inode;
LASSERT(osd_write_locked(env, obj));
LASSERT(th != NULL);
+ OSD_EXEC_OP(th, ref_del);
+
cfs_spin_lock(&obj->oo_guard);
LASSERT(inode->i_nlink > 0);
inode->i_nlink--;
cfs_spin_unlock(&obj->oo_guard);
inode->i_sb->s_op->dirty_inode(inode);
LINVRNT(osd_invariant(obj));
+
+ return 0;
+}
+
+/*
+ * Get the 64-bit version for an inode.
+ */
+static int osd_object_version_get(const struct lu_env *env,
+ struct dt_object *dt, dt_obj_version_t *ver)
+{
+ struct inode *inode = osd_dt_obj(dt)->oo_inode;
+
+ CDEBUG(D_INODE, "Get version "LPX64" for inode %lu\n",
+ LDISKFS_I(inode)->i_fs_version, inode->i_ino);
+ *ver = LDISKFS_I(inode)->i_fs_version;
+ return 0;
}
/*
struct osd_thread_info *info = osd_oti_get(env);
struct dentry *dentry = &info->oti_obj_dentry;
+ /* version get is not real XATTR but uses xattr API */
+ if (strcmp(name, XATTR_NAME_VERSION) == 0) {
+ /* for version we are just using xattr API but change inode
+ * field instead */
+ LASSERT(buf->lb_len == sizeof(dt_obj_version_t));
+ osd_object_version_get(env, dt, buf->lb_buf);
+ return sizeof(dt_obj_version_t);
+ }
+
LASSERT(dt_object_exists(dt));
LASSERT(inode->i_op != NULL && inode->i_op->getxattr != NULL);
LASSERT(osd_read_locked(env, obj) || osd_write_locked(env, obj));
return inode->i_op->getxattr(dentry, name, buf->lb_buf, buf->lb_len);
}
+
+static int osd_declare_xattr_set(const struct lu_env *env, struct dt_object *dt,
+ const struct lu_buf *buf, const char *name,
+ int fl, struct thandle *handle)
+{
+ struct osd_thandle *oh;
+
+ LASSERT(handle != NULL);
+
+ if (strcmp(name, XATTR_NAME_VERSION) == 0) {
+ /* no credits for version */
+ return 0;
+ }
+
+ oh = container_of0(handle, struct osd_thandle, ot_super);
+ LASSERT(oh->ot_handle == NULL);
+
+ OSD_DECLARE_OP(oh, xattr_set);
+ oh->ot_credits += osd_dto_credits_noquota[DTO_XATTR_SET];
+
+ return 0;
+}
+
+/*
+ * Set the 64-bit version for object
+ */
+static void osd_object_version_set(const struct lu_env *env,
+ struct dt_object *dt,
+ dt_obj_version_t *new_version)
+{
+ struct inode *inode = osd_dt_obj(dt)->oo_inode;
+
+ CDEBUG(D_INODE, "Set version "LPX64" (old "LPX64") for inode %lu\n",
+ *new_version, LDISKFS_I(inode)->i_fs_version, inode->i_ino);
+
+ LDISKFS_I(inode)->i_fs_version = *new_version;
+ /** Version is set after all inode operations are finished,
+ * so we should mark it dirty here */
+ inode->i_sb->s_op->dirty_inode(inode);
+}
+
/*
* Concurrency: @dt is write locked.
*/
{
LASSERT(handle != NULL);
+ /* version set is not real XATTR */
+ if (strcmp(name, XATTR_NAME_VERSION) == 0) {
+ /* for version we are just using xattr API but change inode
+ * field instead */
+ LASSERT(buf->lb_len == sizeof(dt_obj_version_t));
+ osd_object_version_set(env, dt, buf->lb_buf);
+ return sizeof(dt_obj_version_t);
+ }
+
if (osd_object_auth(env, dt, capa, CAPA_OPC_META_WRITE))
return -EACCES;
+ OSD_EXEC_OP(handle, xattr_set);
return __osd_xattr_set(env, dt, buf, name, fl);
}
return inode->i_op->listxattr(dentry, buf->lb_buf, buf->lb_len);
}
+static int osd_declare_xattr_del(const struct lu_env *env,
+ struct dt_object *dt,
+ const char *name,
+ struct thandle *handle)
+{
+ struct osd_thandle *oh;
+
+ LASSERT(dt_object_exists(dt));
+ LASSERT(handle != NULL);
+
+ oh = container_of0(handle, struct osd_thandle, ot_super);
+ LASSERT(oh->ot_handle == NULL);
+
+ OSD_DECLARE_OP(oh, xattr_set);
+ oh->ot_credits += osd_dto_credits_noquota[DTO_XATTR_SET];
+
+ return 0;
+}
+
/*
* Concurrency: @dt is write locked.
*/
if (osd_object_auth(env, dt, capa, CAPA_OPC_META_WRITE))
return -EACCES;
+ OSD_EXEC_OP(handle, xattr_set);
+
dentry->d_inode = inode;
rc = inode->i_op->removexattr(dentry, name);
return rc;
RETURN(rc);
}
-/*
- * Get the 64-bit version for an inode.
- */
-static dt_obj_version_t osd_object_version_get(const struct lu_env *env,
- struct dt_object *dt)
-{
- struct inode *inode = osd_dt_obj(dt)->oo_inode;
-
- CDEBUG(D_INFO, "Get version "LPX64" for inode %lu\n",
- LDISKFS_I(inode)->i_fs_version, inode->i_ino);
- return LDISKFS_I(inode)->i_fs_version;
-}
-
-/*
- * Set the 64-bit version and return the old version.
- */
-static void osd_object_version_set(const struct lu_env *env, struct dt_object *dt,
- dt_obj_version_t new_version)
-{
- struct inode *inode = osd_dt_obj(dt)->oo_inode;
-
- CDEBUG(D_INFO, "Set version "LPX64" (old "LPX64") for inode %lu\n",
- new_version, LDISKFS_I(inode)->i_fs_version, inode->i_ino);
- LDISKFS_I(inode)->i_fs_version = new_version;
- /** Version is set after all inode operations are finished,
- * so we should mark it dirty here */
- inode->i_sb->s_op->dirty_inode(inode);
-}
-
static int osd_data_get(const struct lu_env *env, struct dt_object *dt,
void **data)
{
}
static const struct dt_object_operations osd_obj_ops = {
- .do_read_lock = osd_object_read_lock,
- .do_write_lock = osd_object_write_lock,
- .do_read_unlock = osd_object_read_unlock,
- .do_write_unlock = osd_object_write_unlock,
- .do_write_locked = osd_object_write_locked,
- .do_attr_get = osd_attr_get,
- .do_attr_set = osd_attr_set,
- .do_ah_init = osd_ah_init,
- .do_create = osd_object_create,
- .do_index_try = osd_index_try,
- .do_ref_add = osd_object_ref_add,
- .do_ref_del = osd_object_ref_del,
- .do_xattr_get = osd_xattr_get,
- .do_xattr_set = osd_xattr_set,
- .do_xattr_del = osd_xattr_del,
- .do_xattr_list = osd_xattr_list,
- .do_capa_get = osd_capa_get,
- .do_object_sync = osd_object_sync,
- .do_version_get = osd_object_version_get,
- .do_version_set = osd_object_version_set,
- .do_data_get = osd_data_get,
+ .do_read_lock = osd_object_read_lock,
+ .do_write_lock = osd_object_write_lock,
+ .do_read_unlock = osd_object_read_unlock,
+ .do_write_unlock = osd_object_write_unlock,
+ .do_write_locked = osd_object_write_locked,
+ .do_attr_get = osd_attr_get,
+ .do_declare_attr_set = osd_declare_attr_set,
+ .do_attr_set = osd_attr_set,
+ .do_ah_init = osd_ah_init,
+ .do_declare_create = osd_declare_object_create,
+ .do_create = osd_object_create,
+ .do_declare_destroy = osd_declare_object_destroy,
+ .do_destroy = osd_object_destroy,
+ .do_index_try = osd_index_try,
+ .do_declare_ref_add = osd_declare_object_ref_add,
+ .do_ref_add = osd_object_ref_add,
+ .do_declare_ref_del = osd_declare_object_ref_del,
+ .do_ref_del = osd_object_ref_del,
+ .do_xattr_get = osd_xattr_get,
+ .do_declare_xattr_set = osd_declare_xattr_set,
+ .do_xattr_set = osd_xattr_set,
+ .do_declare_xattr_del = osd_declare_xattr_del,
+ .do_xattr_del = osd_xattr_del,
+ .do_xattr_list = osd_xattr_list,
+ .do_capa_get = osd_capa_get,
+ .do_object_sync = osd_object_sync,
+ .do_data_get = osd_data_get,
};
/**
* (i.e. to run 2.0 mds on 1.8 disk) (b11826)
*/
static const struct dt_object_operations osd_obj_ea_ops = {
- .do_read_lock = osd_object_read_lock,
- .do_write_lock = osd_object_write_lock,
- .do_read_unlock = osd_object_read_unlock,
- .do_write_unlock = osd_object_write_unlock,
- .do_write_locked = osd_object_write_locked,
- .do_attr_get = osd_attr_get,
- .do_attr_set = osd_attr_set,
- .do_ah_init = osd_ah_init,
- .do_create = osd_object_ea_create,
- .do_index_try = osd_index_try,
- .do_ref_add = osd_object_ref_add,
- .do_ref_del = osd_object_ref_del,
- .do_xattr_get = osd_xattr_get,
- .do_xattr_set = osd_xattr_set,
- .do_xattr_del = osd_xattr_del,
- .do_xattr_list = osd_xattr_list,
- .do_capa_get = osd_capa_get,
- .do_object_sync = osd_object_sync,
- .do_version_get = osd_object_version_get,
- .do_version_set = osd_object_version_set,
- .do_data_get = osd_data_get,
+ .do_read_lock = osd_object_read_lock,
+ .do_write_lock = osd_object_write_lock,
+ .do_read_unlock = osd_object_read_unlock,
+ .do_write_unlock = osd_object_write_unlock,
+ .do_write_locked = osd_object_write_locked,
+ .do_attr_get = osd_attr_get,
+ .do_declare_attr_set = osd_declare_attr_set,
+ .do_attr_set = osd_attr_set,
+ .do_ah_init = osd_ah_init,
+ .do_declare_create = osd_declare_object_create,
+ .do_create = osd_object_ea_create,
+ .do_declare_destroy = osd_declare_object_destroy,
+ .do_destroy = osd_object_destroy,
+ .do_index_try = osd_index_try,
+ .do_declare_ref_add = osd_declare_object_ref_add,
+ .do_ref_add = osd_object_ref_add,
+ .do_declare_ref_del = osd_declare_object_ref_del,
+ .do_ref_del = osd_object_ref_del,
+ .do_xattr_get = osd_xattr_get,
+ .do_declare_xattr_set = osd_declare_xattr_set,
+ .do_xattr_set = osd_xattr_set,
+ .do_declare_xattr_del = osd_declare_xattr_del,
+ .do_xattr_del = osd_xattr_del,
+ .do_xattr_list = osd_xattr_list,
+ .do_capa_get = osd_capa_get,
+ .do_object_sync = osd_object_sync,
+ .do_data_get = osd_data_get,
};
/*
return err;
}
+static ssize_t osd_declare_write(const struct lu_env *env, struct dt_object *dt,
+ const loff_t size, loff_t pos,
+ struct thandle *handle)
+{
+ struct osd_thandle *oh;
+
+ LASSERT(handle != NULL);
+
+ oh = container_of0(handle, struct osd_thandle, ot_super);
+ LASSERT(oh->ot_handle == NULL);
+
+ OSD_DECLARE_OP(oh, write);
+ oh->ot_credits += osd_dto_credits_noquota[DTO_WRITE_BLOCK];
+
+ return 0;
+}
+
static ssize_t osd_write(const struct lu_env *env, struct dt_object *dt,
const struct lu_buf *buf, loff_t *pos,
struct thandle *handle, struct lustre_capa *capa,
return result;
}
+/*
+ * in some cases we may need declare methods for objects being created
+ * e.g., when we create symlink
+ */
+static const struct dt_body_operations osd_body_ops_new = {
+ .dbo_declare_write = osd_declare_write,
+};
+
static const struct dt_body_operations osd_body_ops = {
- .dbo_read = osd_read,
- .dbo_write = osd_write
+ .dbo_read = osd_read,
+ .dbo_declare_write = osd_declare_write,
+ .dbo_write = osd_write
};
+static int osd_index_declare_iam_delete(const struct lu_env *env,
+ struct dt_object *dt,
+ const struct dt_key *key,
+ struct thandle *handle)
+{
+ struct osd_thandle *oh;
+
+ oh = container_of0(handle, struct osd_thandle, ot_super);
+ LASSERT(oh->ot_handle == NULL);
+
+ OSD_DECLARE_OP(oh, delete);
+ oh->ot_credits += osd_dto_credits_noquota[DTO_INDEX_DELETE];
+
+ return 0;
+}
/**
* delete a (key, value) pair from index \a dt specified by \a key
if (osd_object_auth(env, dt, capa, CAPA_OPC_INDEX_DELETE))
RETURN(-EACCES);
+ OSD_EXEC_OP(handle, delete);
+
ipd = osd_idx_ipd_get(env, bag);
if (unlikely(ipd == NULL))
RETURN(-ENOMEM);
RETURN(rc);
}
+static int osd_index_declare_ea_delete(const struct lu_env *env,
+ struct dt_object *dt,
+ const struct dt_key *key,
+ struct thandle *handle)
+{
+ struct osd_thandle *oh;
+
+ LASSERT(dt_object_exists(dt));
+ LASSERT(handle != NULL);
+
+ oh = container_of0(handle, struct osd_thandle, ot_super);
+ LASSERT(oh->ot_handle == NULL);
+
+ OSD_DECLARE_OP(oh, delete);
+ oh->ot_credits += osd_dto_credits_noquota[DTO_INDEX_DELETE];
+
+ return 0;
+}
+
static inline int osd_get_fid_from_dentry(struct ldiskfs_dir_entry_2 *de,
struct dt_rec *fid)
{
LASSERT(dt_object_exists(dt));
LASSERT(handle != NULL);
+ OSD_EXEC_OP(handle, delete);
+
oh = container_of(handle, struct osd_thandle, ot_super);
LASSERT(oh->ot_handle != NULL);
LASSERT(oh->ot_handle->h_transaction != NULL);
RETURN(rc);
}
+static int osd_index_declare_iam_insert(const struct lu_env *env,
+ struct dt_object *dt,
+ const struct dt_rec *rec,
+ const struct dt_key *key,
+ struct thandle *handle)
+{
+ struct osd_thandle *oh;
+
+ LASSERT(dt_object_exists(dt));
+ LASSERT(handle != NULL);
+
+ oh = container_of0(handle, struct osd_thandle, ot_super);
+ LASSERT(oh->ot_handle == NULL);
+
+ OSD_DECLARE_OP(oh, insert);
+ oh->ot_credits += osd_dto_credits_noquota[DTO_INDEX_INSERT];
+
+ return 0;
+}
+
/**
* Inserts (key, value) pair in \a dt index object.
*
if (osd_object_auth(env, dt, capa, CAPA_OPC_INDEX_INSERT))
return -EACCES;
+ OSD_EXEC_OP(th, insert);
+
ipd = osd_idx_ipd_get(env, bag);
if (unlikely(ipd == NULL))
RETURN(-ENOMEM);
lu_object_put(env, &obj->oo_dt.do_lu);
}
+static int osd_index_declare_ea_insert(const struct lu_env *env,
+ struct dt_object *dt,
+ const struct dt_rec *rec,
+ const struct dt_key *key,
+ struct thandle *handle)
+{
+ struct osd_thandle *oh;
+
+ LASSERT(dt_object_exists(dt));
+ LASSERT(handle != NULL);
+
+ oh = container_of0(handle, struct osd_thandle, ot_super);
+ LASSERT(oh->ot_handle == NULL);
+
+ OSD_DECLARE_OP(oh, insert);
+ oh->ot_credits += osd_dto_credits_noquota[DTO_INDEX_INSERT];
+
+ return 0;
+}
+
/**
* Index add function for interoperability mode (b11826).
* It will add the directory entry.This entry is needed to
*/
static int osd_it_iam_rec(const struct lu_env *env,
const struct dt_it *di,
- struct lu_dirent *lde,
+ struct dt_rec *dtrec,
__u32 attr)
{
struct osd_it_iam *it = (struct osd_it_iam *)di;
struct osd_thread_info *info = osd_oti_get(env);
struct lu_fid *fid = &info->oti_fid;
const struct osd_fid_pack *rec;
+ struct lu_dirent *lde = (struct lu_dirent *)dtrec;
char *name;
int namelen;
__u64 hash;
}
static const struct dt_index_operations osd_index_iam_ops = {
- .dio_lookup = osd_index_iam_lookup,
- .dio_insert = osd_index_iam_insert,
- .dio_delete = osd_index_iam_delete,
+ .dio_lookup = osd_index_iam_lookup,
+ .dio_declare_insert = osd_index_declare_iam_insert,
+ .dio_insert = osd_index_iam_insert,
+ .dio_declare_delete = osd_index_declare_iam_delete,
+ .dio_delete = osd_index_iam_delete,
.dio_it = {
.init = osd_it_iam_init,
.fini = osd_it_iam_fini,
*/
static inline int osd_it_ea_rec(const struct lu_env *env,
const struct dt_it *di,
- struct lu_dirent *lde,
+ struct dt_rec *dtrec,
__u32 attr)
{
struct osd_it_ea *it = (struct osd_it_ea *)di;
struct osd_object *obj = it->oie_obj;
struct lu_fid *fid = &it->oie_dirent->oied_fid;
+ struct lu_dirent *lde = (struct lu_dirent *)dtrec;
int rc = 0;
ENTRY;
* mode (i.e. to run 2.0 mds on 1.8 disk) (b11826)
*/
static const struct dt_index_operations osd_index_ea_ops = {
- .dio_lookup = osd_index_ea_lookup,
- .dio_insert = osd_index_ea_insert,
- .dio_delete = osd_index_ea_delete,
+ .dio_lookup = osd_index_ea_lookup,
+ .dio_declare_insert = osd_index_declare_ea_insert,
+ .dio_insert = osd_index_ea_insert,
+ .dio_declare_delete = osd_index_declare_ea_delete,
+ .dio_delete = osd_index_ea_delete,
.dio_it = {
.init = osd_it_ea_init,
.fini = osd_it_ea_fini,
/*
* XXX temporary: for ->i_op calls.
*/
- struct txn_param oti_txn;
struct timespec oti_time;
/*
* XXX temporary: fake struct file for osd_object_sync
/*
* Copyright (c) 2009, 2010, Oracle and/or its affiliates. All rights reserved.
* Use is subject to license terms.
+ * Copyright (c) 2011 Whamcloud, Inc.
*/
/*
* This file is part of Lustre, http://www.lustre.org/
const struct lu_buf *buf, loff_t *off, int sync)
{
struct thandle *th;
- struct txn_param p;
- int rc, credits;
+ int rc;
ENTRY;
- credits = lut->lut_bottom->dd_ops->dt_credit_get(env, lut->lut_bottom,
- DTO_WRITE_BLOCK);
- txn_param_init(&p, credits);
-
- th = dt_trans_start(env, lut->lut_bottom, &p);
+ th = dt_trans_create(env, lut->lut_bottom);
if (IS_ERR(th))
RETURN(PTR_ERR(th));
+ rc = dt_declare_record_write(env, lut->lut_last_rcvd,
+ buf->lb_len, *off, th);
+ if (rc)
+ goto stop;
+
+ rc = dt_trans_start(env, lut->lut_bottom, th);
+ if (rc)
+ goto stop;
+
rc = dt_record_write(env, lut->lut_last_rcvd, buf, off, th);
+
+stop:
dt_trans_stop(env, lut->lut_bottom, th);
CDEBUG(D_INFO, "write last_rcvd header rc = %d:\n"
/**
* Update server data in last_rcvd
*/
-static int lut_server_data_update(const struct lu_env *env,
- struct lu_target *lut, int sync)
+int lut_server_data_update(const struct lu_env *env,
+ struct lu_target *lut, int sync)
{
struct lr_server_data tmp_lsd;
loff_t tmp_off = 0;
test_85b() { #bug 16774
lctl set_param -n ldlm.cancel_unused_locks_before_replay "1"
- lfs setstripe -o 0 -c 1 $DIR
+ do_facet mgs $LCTL pool_new $FSNAME.$TESTNAME || return 1
+ do_facet mgs $LCTL pool_add $FSNAME.$TESTNAME $FSNAME-OST0000 || return 2
+
+ lfs setstripe -c 1 -p $FSNAME.$TESTNAME $DIR
for i in `seq 100`; do
dd if=/dev/urandom of=$DIR/$tfile-$i bs=4096 count=32 >/dev/null 2>&1
addr=`echo $lov_id | awk '{print $4}' | awk -F '-' '{print $3}'`
count=`lctl get_param -n ldlm.namespaces.*OST0000*$addr.lock_unused_count`
echo "before recovery: unused locks count = $count"
+ [ $count != 0 ] || return 3
fail ost1
count2=`lctl get_param -n ldlm.namespaces.*OST0000*$addr.lock_unused_count`
echo "after recovery: unused locks count = $count2"
+ do_facet mgs $LCTL pool_remove $FSNAME.$TESTNAME $FSNAME-OST0000 || return 4
+ do_facet mgs $LCTL pool_destroy $FSNAME.$TESTNAME || return 5
+
if [ $count2 -ge $count ]; then
error "unused locks are not canceled"
fi