OUT_INDEX_DELETE = 11,
OUT_WRITE = 12,
OUT_XATTR_DEL = 13,
+ OUT_PUNCH = 14,
OUT_LAST
};
const struct lu_fid *fid,
const struct lu_buf *buf,
__u64 pos);
+int update_records_punch_pack(const struct lu_env *env,
+ struct update_ops *ops,
+ unsigned int *op_count,
+ size_t *max_ops_size,
+ struct update_params *params,
+ unsigned int *param_count,
+ size_t *max_param_size,
+ const struct lu_fid *fid,
+ __u64 start, __u64 end);
int tur_update_records_extend(struct thandle_update_records *tur,
size_t new_size);
return top_trans_start(env, dt2lod_dev(dt)->lod_child, th);
}
+static int lod_trans_cb_add(struct thandle *th,
+ struct dt_txn_commit_cb *dcb)
+{
+ struct top_thandle *top_th = container_of(th, struct top_thandle,
+ tt_super);
+ return dt_trans_cb_add(top_th->tt_master_sub_thandle, dcb);
+}
+
/**
* Implementation of dt_device_operations::dt_trans_stop() for LOD
*
.dt_sync = lod_sync,
.dt_ro = lod_ro,
.dt_commit_async = lod_commit_async,
+ .dt_trans_cb_add = lod_trans_cb_add,
};
/**
ssize_t lod_sub_object_write(const struct lu_env *env, struct dt_object *dt,
const struct lu_buf *buf, loff_t *pos,
struct thandle *th, int rq);
+int lod_sub_object_declare_punch(const struct lu_env *env,
+ struct dt_object *dt,
+ __u64 start, __u64 end,
+ struct thandle *th);
+int lod_sub_object_punch(const struct lu_env *env, struct dt_object *dt,
+ __u64 start, __u64 end, struct thandle *th);
#endif
static const char dotdot[] = "..";
static const struct dt_body_operations lod_body_lnk_ops;
+static const struct dt_body_operations lod_body_ops;
/**
* Implementation of dt_index_operations::dio_lookup
if (dof->dof_type == DFT_SYM)
dt->do_body_ops = &lod_body_lnk_ops;
+ else if (dof->dof_type == DFT_REGULAR)
+ dt->do_body_ops = &lod_body_ops;
/*
* it's lod_ah_init() that has decided the object will be striped
return lod_sub_object_write(env, dt_object_child(dt), buf, pos, th, iq);
}
+static int lod_declare_punch(const struct lu_env *env, struct dt_object *dt,
+ __u64 start, __u64 end, struct thandle *th)
+{
+ if (dt_object_remote(dt))
+ return -ENOTSUPP;
+
+ return lod_sub_object_declare_punch(env, dt_object_child(dt), start,
+ end, th);
+}
+
+static int lod_punch(const struct lu_env *env, struct dt_object *dt,
+ __u64 start, __u64 end, struct thandle *th)
+{
+ if (dt_object_remote(dt))
+ return -ENOTSUPP;
+
+ return lod_sub_object_punch(env, dt_object_child(dt), start, end, th);
+}
+
static const struct dt_body_operations lod_body_lnk_ops = {
.dbo_read = lod_read,
.dbo_declare_write = lod_declare_write,
.dbo_write = lod_write
};
+static const struct dt_body_operations lod_body_ops = {
+ .dbo_read = lod_read,
+ .dbo_declare_write = lod_declare_write,
+ .dbo_write = lod_write,
+ .dbo_declare_punch = lod_declare_punch,
+ .dbo_punch = lod_punch,
+};
+
/**
* Implementation of lu_object_operations::loo_object_init.
*
*/
static int lod_object_start(const struct lu_env *env, struct lu_object *o)
{
- if (S_ISLNK(o->lo_header->loh_attr & S_IFMT))
+ if (S_ISLNK(o->lo_header->loh_attr & S_IFMT)) {
lu2lod_obj(o)->ldo_obj.do_body_ops = &lod_body_lnk_ops;
+ } else if (S_ISREG(o->lo_header->loh_attr & S_IFMT) ||
+ fid_is_local_file(lu_object_fid(o))) {
+ /* Note: some local file (like last rcvd) is created
+ * through bottom layer (OSD), so the object initialization
+ * comes to lod, it does not set loh_attr yet, so
+ * set do_body_ops for local file anyway */
+ lu2lod_obj(o)->ldo_obj.do_body_ops = &lod_body_ops;
+ }
return 0;
}
rc = dt_write(env, dt, buf, pos, sub_th, rq);
RETURN(rc);
}
+
+/**
+ * Declare punch
+ *
+ * Get transaction of next layer and declare punch.
+ *
+ * \param[in] env execution environment
+ * \param[in] dt object to be written
+ * \param[in] start start offset of punch
+ * \param[in] end end offet of punch
+ * \param[in] th transaction handle
+ *
+ * \retval 0 if the insertion succeeds.
+ * \retval negative errno if the insertion fails.
+ */
+int lod_sub_object_declare_punch(const struct lu_env *env,
+ struct dt_object *dt,
+ __u64 start, __u64 end,
+ struct thandle *th)
+{
+ struct thandle *sub_th;
+ int rc;
+ ENTRY;
+
+ sub_th = lod_sub_get_thandle(env, th, dt, NULL);
+ if (IS_ERR(sub_th))
+ RETURN(PTR_ERR(sub_th));
+
+ rc = dt_declare_punch(env, dt, start, end, sub_th);
+
+ RETURN(rc);
+}
+
+/**
+ * Punch to sub object
+ *
+ * Get transaction of next layer, records buffer write if it belongs to
+ * Cross-MDT operation, and punch object.
+ *
+ * \param[in] env execution environment
+ * \param[in] dt object to be written
+ * \param[in] start start offset of punch
+ * \param[in] end end offset of punch
+ * \param[in] th transaction handle
+ * \param[in] capa capability of the write
+ *
+ * \retval the buffer size in bytes if it succeeds.
+ * \retval negative errno if it fails.
+ */
+int lod_sub_object_punch(const struct lu_env *env, struct dt_object *dt,
+ __u64 start, __u64 end, struct thandle *th)
+{
+ struct thandle *sub_th;
+ bool record_update;
+ int rc;
+ ENTRY;
+
+ sub_th = lod_sub_get_thandle(env, th, dt, &record_update);
+ if (IS_ERR(sub_th))
+ RETURN(PTR_ERR(sub_th));
+
+ if (record_update) {
+ rc = update_record_pack(punch, th, lu_object_fid(&dt->do_lu),
+ start, end);
+ if (rc < 0)
+ RETURN(rc);
+ }
+
+ rc = dt_punch(env, dt, start, end, sub_th);
+
+ RETURN(rc);
+}
int dt_txn_hook_start(const struct lu_env *env,
struct dt_device *dev, struct thandle *th)
{
- int rc = 0;
- struct dt_txn_callback *cb;
+ int rc = 0;
+ struct dt_txn_callback *cb;
- if (th->th_local)
- return 0;
+ if (th->th_local)
+ return 0;
list_for_each_entry(cb, &dev->dd_txn_callbacks, dtc_linkage) {
- if (cb->dtc_txn_start == NULL ||
- !(cb->dtc_tag & env->le_ctx.lc_tags))
- continue;
- rc = cb->dtc_txn_start(env, th, cb->dtc_cookie);
- if (rc < 0)
- break;
- }
- return rc;
+ struct thandle *dtc_th = th;
+
+ if (cb->dtc_txn_start == NULL ||
+ !(cb->dtc_tag & env->le_ctx.lc_tags))
+ continue;
+
+ /* Usually dt_txn_hook_start is called from bottom device,
+ * and if the thandle has th_top, then we need use top
+ * thandle for the callback in the top thandle layer */
+ if (th->th_top != NULL)
+ dtc_th = th->th_top;
+
+ rc = cb->dtc_txn_start(env, dtc_th, cb->dtc_cookie);
+ if (rc < 0)
+ break;
+ }
+ return rc;
}
EXPORT_SYMBOL(dt_txn_hook_start);
-int dt_txn_hook_stop(const struct lu_env *env, struct thandle *txn)
+int dt_txn_hook_stop(const struct lu_env *env, struct thandle *th)
{
- struct dt_device *dev = txn->th_dev;
- struct dt_txn_callback *cb;
- int rc = 0;
+ struct dt_device *dev = th->th_dev;
+ struct dt_txn_callback *cb;
+ int rc = 0;
- if (txn->th_local)
- return 0;
+ if (th->th_local)
+ return 0;
list_for_each_entry(cb, &dev->dd_txn_callbacks, dtc_linkage) {
- if (cb->dtc_txn_stop == NULL ||
- !(cb->dtc_tag & env->le_ctx.lc_tags))
- continue;
- rc = cb->dtc_txn_stop(env, txn, cb->dtc_cookie);
- if (rc < 0)
- break;
- }
- return rc;
+ struct thandle *dtc_th = th;
+
+ if (cb->dtc_txn_stop == NULL ||
+ !(cb->dtc_tag & env->le_ctx.lc_tags))
+ continue;
+
+ /* Usually dt_txn_hook_stop is called from bottom device,
+ * and if the thandle has th_top, then we need use top
+ * thandle for the callback in the top thandle layer */
+ if (th->th_top != NULL)
+ dtc_th = th->th_top;
+
+ rc = cb->dtc_txn_stop(env, dtc_th, cb->dtc_cookie);
+ if (rc < 0)
+ break;
+ }
+ return rc;
}
EXPORT_SYMBOL(dt_txn_hook_stop);
-void dt_txn_hook_commit(struct thandle *txn)
+void dt_txn_hook_commit(struct thandle *th)
{
struct dt_txn_callback *cb;
- if (txn->th_local)
+ if (th->th_local)
return;
- list_for_each_entry(cb, &txn->th_dev->dd_txn_callbacks,
+ list_for_each_entry(cb, &th->th_dev->dd_txn_callbacks,
dtc_linkage) {
+ /* Right now, the bottom device (OSD) will use this hook
+ * commit to notify OSP, so we do not check and replace
+ * the thandle to top thandle now */
if (cb->dtc_txn_commit)
- cb->dtc_txn_commit(txn, cb->dtc_cookie);
+ cb->dtc_txn_commit(th, cb->dtc_cookie);
}
}
EXPORT_SYMBOL(dt_txn_hook_commit);
[OUT_INDEX_DELETE] = "delete",
[OUT_WRITE] = "write",
[OUT_XATTR_DEL] = "xattr_del",
+ [OUT_PUNCH] = "punch",
};
if (opc < ARRAY_SIZE(opc_str) && opc_str[opc] != NULL)
struct thandle *th)
{
struct tgt_thread_info *tti = tgt_th_info(env);
+ struct dt_object *dto;
lcd->lcd_last_result = ptlrpc_status_hton(lcd->lcd_last_result);
lcd->lcd_last_close_result =
lcd_cpu_to_le(lcd, &tti->tti_lcd);
tti_buf_lcd(tti);
- return dt_record_write(env, tgt->lut_last_rcvd, &tti->tti_buf, off, th);
+ dto = dt_object_locate(tgt->lut_last_rcvd, th->th_dev);
+ return dt_record_write(env, dto, &tti->tti_buf, off, th);
}
/**
struct thandle *th)
{
struct tgt_thread_info *tti = tgt_th_info(env);
+ struct dt_object *dto;
int rc;
ENTRY;
tti_buf_lsd(tti);
lsd_cpu_to_le(&tgt->lut_lsd, &tti->tti_lsd);
- rc = dt_record_write(env, tgt->lut_last_rcvd, &tti->tti_buf,
- &tti->tti_off, th);
+ dto = dt_object_locate(tgt->lut_last_rcvd, th->th_dev);
+ rc = dt_record_write(env, dto, &tti->tti_buf, &tti->tti_off, th);
CDEBUG(D_INFO, "%s: write last_rcvd server data for UUID = %s, "
"last_transno = "LPU64": rc = %d\n", tgt->lut_obd->obd_name,
spin_unlock(&tgt->lut_translock);
/** VBR: set new versions */
- if (th->th_result == 0 && obj != NULL)
- dt_version_set(env, obj, tti->tti_transno, th);
+ if (th->th_result == 0 && obj != NULL) {
+ struct dt_object *dto = dt_object_locate(obj, th->th_dev);
+ dt_version_set(env, dto, tti->tti_transno, th);
+ }
/* filling reply data */
CDEBUG(D_INODE, "transno = "LPU64", last_committed = "LPU64"\n",
struct lu_target *tgt = cookie;
struct tgt_session_info *tsi;
struct tgt_thread_info *tti = tgt_th_info(env);
+ struct dt_object *dto;
int rc;
/* if there is no session, then this transaction is not result of
if (tsi->tsi_exp == NULL)
return 0;
+ dto = dt_object_locate(tgt->lut_last_rcvd, th->th_dev);
tti_buf_lcd(tti);
- rc = dt_declare_record_write(env, tgt->lut_last_rcvd,
- &tti->tti_buf,
+
+ rc = dt_declare_record_write(env, dto, &tti->tti_buf,
tsi->tsi_exp->exp_target_data.ted_lr_off,
th);
if (rc)
return rc;
tti_buf_lsd(tti);
- rc = dt_declare_record_write(env, tgt->lut_last_rcvd,
- &tti->tti_buf, 0, th);
+ rc = dt_declare_record_write(env, dto, &tti->tti_buf, 0, th);
if (rc)
return rc;
if (tsi->tsi_vbr_obj != NULL &&
- !lu_object_remote(&tsi->tsi_vbr_obj->do_lu))
- rc = dt_declare_version_set(env, tsi->tsi_vbr_obj, th);
+ !lu_object_remote(&tsi->tsi_vbr_obj->do_lu)) {
+ dto = dt_object_locate(tsi->tsi_vbr_obj, th->th_dev);
+ rc = dt_declare_version_set(env, dto, th);
+ }
return rc;
}
pos = cpu_to_le64(pos);
- return update_records_update_pack(env, fid, OUT_XATTR_DEL, ops,
+ return update_records_update_pack(env, fid, OUT_WRITE, ops,
op_count, max_ops_size, params,
param_count, max_param_size,
2, bufs, sizes);
EXPORT_SYMBOL(update_records_write_pack);
/**
+ * Pack punch
+ *
+ * Pack punch update into update records.
+ *
+ * \param[in] env execution environment
+ * \param[in] ops ur_ops in update records
+ * \param[in|out] op_count pointer to the count of ops
+ * \param[in|out] max_op_size maximum size of the update
+ * \param[in] params ur_params in update records
+ * \param[in|out] param_count pointer to the count of params
+ * \param[in|out] max_param_size maximum size of the parameter
+ * \param[in] fid FID of the object to write into
+ * \param[in] start start offset of punch
+ * \param[in] end end offet of punch
+ *
+ * \retval 0 if packing succeeds.
+ * \retval negative errno if packing fails.
+ */
+int update_records_punch_pack(const struct lu_env *env,
+ struct update_ops *ops,
+ unsigned int *op_count,
+ size_t *max_ops_size,
+ struct update_params *params,
+ unsigned int *param_count,
+ size_t *max_param_size,
+ const struct lu_fid *fid,
+ __u64 start, __u64 end)
+{
+ size_t sizes[2] = {sizeof(start), sizeof(end)};
+ const void *bufs[2] = {&start, &end};
+
+ start = cpu_to_le64(start);
+ end = cpu_to_le64(end);
+
+ return update_records_update_pack(env, fid, OUT_PUNCH, ops, op_count,
+ max_ops_size, params, param_count,
+ max_param_size, 2, bufs, sizes);
+}
+EXPORT_SYMBOL(update_records_punch_pack);
+
+/**
* Create update records in thandle_update_records
*
* Allocate update_records for thandle_update_records, the initial size
record = &tur->tur_update_records->lur_update_rec;
update_records_dump(record, D_INFO, false);
}
+ top_th->tt_update_records = NULL;
}
LASSERT(top_th->tt_magic == TOP_THANDLE_MAGIC);
struct dt_device *sub_dt)
{
struct sub_thandle *lst;
- struct top_thandle *top_th = container_of(th, struct top_thandle,
- tt_super);
+ struct top_thandle *top_th;
struct thandle *sub_th;
ENTRY;
+ top_th = container_of(th, struct top_thandle, tt_super);
LASSERT(top_th->tt_magic == TOP_THANDLE_MAGIC);
LASSERT(top_th->tt_master_sub_thandle != NULL);
if (likely(sub_dt == top_th->tt_master_sub_thandle->th_dev))