#define TRANS_COMMIT_CB_MAGIC 0xa0a00a0a
#define MAX_COMMIT_CB_STR_LEN 32
-#define DCB_TRANS_NOT_COMMITTED 0x1
+#define DCB_TRANS_STOP 0x1
struct dt_txn_commit_cb {
struct list_head dcb_linkage;
dt_cb_t dcb_func;
__u64 la_valid;
};
-static inline void lu_attr_cpu_to_le(struct lu_attr *dst_attr,
- struct lu_attr *src_attr)
-{
- dst_attr->la_size = cpu_to_le64(src_attr->la_size);
- dst_attr->la_mtime = cpu_to_le64(src_attr->la_mtime);
- dst_attr->la_atime = cpu_to_le64(src_attr->la_atime);
- dst_attr->la_ctime = cpu_to_le64(src_attr->la_ctime);
- dst_attr->la_blocks = cpu_to_le64(src_attr->la_blocks);
- dst_attr->la_mode = cpu_to_le32(src_attr->la_mode);
- dst_attr->la_uid = cpu_to_le32(src_attr->la_uid);
- dst_attr->la_gid = cpu_to_le32(src_attr->la_gid);
- dst_attr->la_flags = cpu_to_le32(src_attr->la_flags);
- dst_attr->la_nlink = cpu_to_le32(src_attr->la_nlink);
- dst_attr->la_blkbits = cpu_to_le32(src_attr->la_blkbits);
- dst_attr->la_blksize = cpu_to_le32(src_attr->la_blksize);
- dst_attr->la_rdev = cpu_to_le32(src_attr->la_rdev);
- dst_attr->la_valid = cpu_to_le64(src_attr->la_valid);
-}
-
-static inline void lu_attr_le_to_cpu(struct lu_attr *dst_attr,
- struct lu_attr *src_attr)
-{
- dst_attr->la_size = le64_to_cpu(src_attr->la_size);
- dst_attr->la_mtime = le64_to_cpu(src_attr->la_mtime);
- dst_attr->la_atime = le64_to_cpu(src_attr->la_atime);
- dst_attr->la_ctime = le64_to_cpu(src_attr->la_ctime);
- dst_attr->la_blocks = le64_to_cpu(src_attr->la_blocks);
- dst_attr->la_mode = le32_to_cpu(src_attr->la_mode);
- dst_attr->la_uid = le32_to_cpu(src_attr->la_uid);
- dst_attr->la_gid = le32_to_cpu(src_attr->la_gid);
- dst_attr->la_flags = le32_to_cpu(src_attr->la_flags);
- dst_attr->la_nlink = le32_to_cpu(src_attr->la_nlink);
- dst_attr->la_blkbits = le32_to_cpu(src_attr->la_blkbits);
- dst_attr->la_blksize = le32_to_cpu(src_attr->la_blksize);
- dst_attr->la_rdev = le32_to_cpu(src_attr->la_rdev);
- dst_attr->la_valid = le64_to_cpu(src_attr->la_valid);
-}
-
/** Bit-mask of valid attributes */
enum la_valid {
LA_ATIME = 1 << 0,
__u64 distribute_txn_get_next_transno(struct target_distribute_txn_data *tdtd);
struct distribute_txn_replay_req *
distribute_txn_get_next_req(struct target_distribute_txn_data *tdtd);
-void dtrq_destory(struct distribute_txn_replay_req *dtrq);
+void dtrq_destroy(struct distribute_txn_replay_req *dtrq);
struct distribute_txn_replay_req_sub *
dtrq_sub_lookup(struct distribute_txn_replay_req *dtrq, __u32 mdt_index);
/* In-memory descriptor for a log object or log catalog */
struct llog_handle {
struct rw_semaphore lgh_lock;
- spinlock_t lgh_hdr_lock; /* protect lgh_hdr data */
+ struct rw_semaphore lgh_hdr_lock; /* protect lgh_hdr data */
struct llog_logid lgh_id; /* id of this log */
struct llog_log_hdr *lgh_hdr;
struct dt_object *lgh_obj;
struct dt_rec;
struct object_update_param;
-struct update_buffer {
- struct object_update_request *ub_req;
- size_t ub_req_size;
-};
-
-/**
- * Tracking the updates being executed on this dt_device.
- */
-struct dt_update_request {
- struct dt_device *dur_dt;
- /* attached itself to thandle */
- int dur_flags;
- /* update request result */
- int dur_rc;
- /* Current batch(transaction) id */
- __u64 dur_batchid;
- /* Holding object updates */
- struct update_buffer dur_buf;
- struct list_head dur_cb_items;
-};
-
struct update_params {
struct object_update_param up_params[0];
};
if (size != NULL)
*size = param->oup_len;
- return ¶m->oup_buf[0];
+ return param->oup_buf;
}
struct update_op {
/* All of update records will packed here */
struct thandle_update_records *tmt_update_records;
+ wait_queue_head_t tmt_stop_waitq;
__u64 tmt_batchid;
int tmt_result;
__u32 tmt_magic;
struct sub_thandle {
struct thandle *st_sub_th;
struct dt_device *st_dt;
- struct list_head st_list;
struct llog_cookie st_cookie;
struct dt_txn_commit_cb st_commit_dcb;
+ struct dt_txn_commit_cb st_stop_dcb;
int st_result;
/* linked to top_thandle */
struct list_head st_sub_list;
/* If this sub thandle is committed */
- bool st_committed:1;
+ bool st_committed:1,
+ st_stopped:1;
};
struct tx_arg;
typedef int (*tx_exec_func_t)(const struct lu_env *env, struct thandle *th,
struct tx_arg *ta);
-/* Structure for holding one update executation */
+/* Structure for holding one update execution */
struct tx_arg {
tx_exec_func_t exec_fn;
tx_exec_func_t undo_fn;
if (lut->lut_tdtd != NULL) {
struct target_distribute_txn_data *tdtd;
- __u64 update_transno;
tdtd = lut->lut_tdtd;
update_transno = distribute_txn_get_next_transno(lut->lut_tdtd);
} else if (obd->obd_recovery_expired) {
CDEBUG(D_HA, "waking for expired recovery\n");
wake_up = 1;
- } else if (req_transno == next_transno || (update_transno != 0 &&
- update_transno <= next_transno)) {
+ } else if (req_transno == next_transno ||
+ (update_transno != 0 && update_transno <= next_transno)) {
CDEBUG(D_HA, "waking for next ("LPD64")\n", next_transno);
wake_up = 1;
} else if (queue_len > 0 &&
obd->obd_next_recovery_transno = transno;
spin_unlock(&obd->obd_recovery_task_lock);
target_update_lcd(env, lut, dtrq);
- dtrq_destory(dtrq);
+ dtrq_destroy(dtrq);
} else {
spin_unlock(&obd->obd_recovery_task_lock);
LASSERT(list_empty(&obd->obd_req_replay_queue));
if (rec->lrh_len !=
llog_update_record_size((struct llog_update_record *)rec)) {
- CERROR("%s broken update record! index %u "DOSTID":%u : rc = %d\n",
- lod2obd(lrd->lrd_lod)->obd_name, index,
+ CERROR("%s broken update record! index %u "DOSTID":%u :"
+ " rc = %d\n", lod2obd(lrd->lrd_lod)->obd_name, index,
POSTID(&llh->lgh_id.lgl_oi), rec->lrh_index, -EIO);
return -EIO;
}
return NULL;
init_rwsem(&loghandle->lgh_lock);
- spin_lock_init(&loghandle->lgh_hdr_lock);
+ init_rwsem(&loghandle->lgh_hdr_lock);
INIT_LIST_HEAD(&loghandle->u.phd.phd_entry);
atomic_set(&loghandle->lgh_refcount, 1);
RETURN(-EINVAL);
}
- spin_lock(&loghandle->lgh_hdr_lock);
+ down_write(&loghandle->lgh_hdr_lock);
if (!ext2_clear_bit(index, llh->llh_bitmap)) {
- spin_unlock(&loghandle->lgh_hdr_lock);
+ up_write(&loghandle->lgh_hdr_lock);
CDEBUG(D_RPCTRACE, "Catalog index %u already clear?\n", index);
RETURN(-ENOENT);
}
if ((llh->llh_flags & LLOG_F_ZAP_WHEN_EMPTY) &&
(llh->llh_count == 1) &&
(loghandle->lgh_last_idx == (LLOG_BITMAP_BYTES * 8) - 1)) {
- spin_unlock(&loghandle->lgh_hdr_lock);
+ up_write(&loghandle->lgh_hdr_lock);
rc = llog_destroy(env, loghandle);
if (rc < 0) {
CERROR("%s: can't destroy empty llog #"DOSTID
}
RETURN(LLOG_DEL_PLAIN);
}
- spin_unlock(&loghandle->lgh_hdr_lock);
+ up_write(&loghandle->lgh_hdr_lock);
rc = llog_write(env, loghandle, &llh->llh_hdr, LLOG_HEADER_IDX);
if (rc < 0) {
}
RETURN(0);
out_err:
- spin_lock(&loghandle->lgh_hdr_lock);
+ down_write(&loghandle->lgh_hdr_lock);
ext2_set_bit(index, llh->llh_bitmap);
llh->llh_count++;
- spin_unlock(&loghandle->lgh_hdr_lock);
+ up_write(&loghandle->lgh_hdr_lock);
return rc;
}
/* the lgh_hdr_lock protects llog header data from concurrent
* update/cancel, the llh_count and llh_bitmap are protected */
- spin_lock(&loghandle->lgh_hdr_lock);
+ down_write(&loghandle->lgh_hdr_lock);
if (ext2_set_bit(index, llh->llh_bitmap)) {
CERROR("%s: index %u already set in log bitmap\n",
o->do_lu.lo_dev->ld_obd->obd_name, index);
- spin_unlock(&loghandle->lgh_hdr_lock);
+ up_write(&loghandle->lgh_hdr_lock);
LBUG(); /* should never happen */
}
llh->llh_count++;
- spin_unlock(&loghandle->lgh_hdr_lock);
+
+ /* XXX It is a bit tricky here, if the log object is local,
+ * we do not need lock during write here, because if there is
+ * race, the transaction(jbd2, what about ZFS?) will make sure the
+ * conflicts will all committed in the same transaction group.
+ * But for remote object, we need lock the whole process, so to
+ * set the version of the remote transaction to make sure they
+ * are being sent in order. (see osp_md_write()) */
+ if (!dt_object_remote(o))
+ up_write(&loghandle->lgh_hdr_lock);
if (lgi->lgi_attr.la_size == 0) {
lgi->lgi_off = 0;
lgi->lgi_buf.lb_buf = &llh->llh_hdr;
rc = dt_record_write(env, o, &lgi->lgi_buf, &lgi->lgi_off, th);
if (rc != 0)
- GOTO(out, rc);
+ GOTO(out_remote_unlock, rc);
} else {
/* Note: If this is not initialization (size == 0), then do not
* write the whole header (8k bytes), only update header/tail
lgi->lgi_buf.lb_buf = &llh->llh_count;
rc = dt_record_write(env, o, &lgi->lgi_buf, &lgi->lgi_off, th);
if (rc != 0)
- GOTO(out, rc);
+ GOTO(out_remote_unlock, rc);
lgi->lgi_off = offsetof(typeof(*llh),
llh_bitmap[index / (sizeof(*llh->llh_bitmap) * 8)]);
&llh->llh_bitmap[index/(sizeof(*llh->llh_bitmap)*8)];
rc = dt_record_write(env, o, &lgi->lgi_buf, &lgi->lgi_off, th);
if (rc != 0)
- GOTO(out, rc);
+ GOTO(out_remote_unlock, rc);
lgi->lgi_off = offsetof(typeof(*llh), llh_tail);
lgi->lgi_buf.lb_len = sizeof(llh->llh_tail);
lgi->lgi_buf.lb_buf = &llh->llh_tail;
rc = dt_record_write(env, o, &lgi->lgi_buf, &lgi->lgi_off, th);
if (rc != 0)
- GOTO(out, rc);
+ GOTO(out_remote_unlock, rc);
}
+out_remote_unlock:
+ /* unlock here for remote object */
+ if (dt_object_remote(o))
+ up_write(&loghandle->lgh_hdr_lock);
+ if (rc)
+ GOTO(out, rc);
+
rc = dt_attr_get(env, o, &lgi->lgi_attr);
if (rc)
GOTO(out, rc);
RETURN(rc);
out:
/* cleanup llog for error case */
- spin_lock(&loghandle->lgh_hdr_lock);
+ down_write(&loghandle->lgh_hdr_lock);
ext2_clear_bit(index, llh->llh_bitmap);
llh->llh_count--;
- spin_unlock(&loghandle->lgh_hdr_lock);
+ up_write(&loghandle->lgh_hdr_lock);
/* restore llog last_idx */
loghandle->lgh_last_idx--;
dt_txn_hook_commit(th);
/* call per-transaction callbacks if any */
- list_for_each_entry_safe(dcb, tmp, &oh->ot_dcb_list, dcb_linkage) {
+ list_for_each_entry_safe(dcb, tmp, &oh->ot_commit_dcb_list,
+ dcb_linkage) {
LASSERTF(dcb->dcb_magic == TRANS_COMMIT_CB_MAGIC,
"commit callback entry: magic=%x name='%s'\n",
dcb->dcb_magic, dcb->dcb_name);
th->th_result = 0;
th->th_tags = LCT_TX_HANDLE;
oh->ot_credits = 0;
- INIT_LIST_HEAD(&oh->ot_dcb_list);
+ INIT_LIST_HEAD(&oh->ot_commit_dcb_list);
+ INIT_LIST_HEAD(&oh->ot_stop_dcb_list);
osd_th_alloced(oh);
memset(oti->oti_declare_ops, 0,
RETURN(ss->ss_node_id == range->lsr_index);
}
+static void osd_trans_stop_cb(struct osd_thandle *oth, int result)
+{
+ struct dt_txn_commit_cb *dcb;
+ struct dt_txn_commit_cb *tmp;
+
+ /* call per-transaction stop callbacks if any */
+ list_for_each_entry_safe(dcb, tmp, &oth->ot_stop_dcb_list,
+ dcb_linkage) {
+ LASSERTF(dcb->dcb_magic == TRANS_COMMIT_CB_MAGIC,
+ "commit callback entry: magic=%x name='%s'\n",
+ dcb->dcb_magic, dcb->dcb_name);
+ list_del_init(&dcb->dcb_linkage);
+ dcb->dcb_func(NULL, &oth->ot_super, dcb, result);
+ }
+}
+
/*
* Concurrency: shouldn't matter.
*/
CERROR("%s: failed in transaction hook: rc = %d\n",
osd_name(osd), rc);
+ osd_trans_stop_cb(oh, rc);
/* hook functions might modify th_sync */
hdl->h_sync = th->th_sync;
CERROR("%s: failed to stop transaction: rc = %d\n",
osd_name(osd), rc);
} else {
+ osd_trans_stop_cb(oh, th->th_result);
OBD_FREE_PTR(oh);
}
LASSERT(dcb->dcb_magic == TRANS_COMMIT_CB_MAGIC);
LASSERT(&dcb->dcb_func != NULL);
- list_add(&dcb->dcb_linkage, &oh->ot_dcb_list);
+ if (dcb->dcb_flags & DCB_TRANS_STOP)
+ list_add(&dcb->dcb_linkage, &oh->ot_stop_dcb_list);
+ else
+ list_add(&dcb->dcb_linkage, &oh->ot_commit_dcb_list);
return 0;
}
struct thandle ot_super;
handle_t *ot_handle;
struct ldiskfs_journal_cb_entry ot_jcb;
- struct list_head ot_dcb_list;
+ struct list_head ot_commit_dcb_list;
+ struct list_head ot_stop_dcb_list;
/* Link to the device, for debugging. */
struct lu_ref_link ot_dev_link;
unsigned short ot_credits;
#define DEBUG_SUBSYSTEM S_MDS
+#include <linux/kthread.h>
#include <obd_class.h>
#include <lustre_ioctl.h>
#include <lustre_param.h>
}
/**
+ * Initialize the osp_update structure in OSP device
+ *
+ * Allocate osp update structure and start update thread.
+ *
+ * \param[in] osp OSP device
+ *
+ * \retval 0 if initialization succeeds.
+ * \retval negative errno if initialization fails.
+ */
+static int osp_update_init(struct osp_device *osp)
+{
+ struct l_wait_info lwi = { 0 };
+ struct task_struct *task;
+
+ ENTRY;
+
+ LASSERT(osp->opd_connect_mdt);
+
+ OBD_ALLOC_PTR(osp->opd_update);
+ if (osp->opd_update == NULL)
+ RETURN(-ENOMEM);
+
+ init_waitqueue_head(&osp->opd_update_thread.t_ctl_waitq);
+ init_waitqueue_head(&osp->opd_update->ou_waitq);
+ spin_lock_init(&osp->opd_update->ou_lock);
+ INIT_LIST_HEAD(&osp->opd_update->ou_list);
+ osp->opd_update->ou_rpc_version = 1;
+ osp->opd_update->ou_version = 1;
+
+ /* start thread handling sending updates to the remote MDT */
+ task = kthread_run(osp_send_update_thread, osp,
+ "osp_up%u-%u", osp->opd_index, osp->opd_group);
+ if (IS_ERR(task)) {
+ int rc = PTR_ERR(task);
+
+ OBD_FREE_PTR(osp->opd_update);
+ osp->opd_update = NULL;
+ CERROR("%s: can't start precreate thread: rc = %d\n",
+ osp->opd_obd->obd_name, rc);
+ RETURN(rc);
+ }
+
+ l_wait_event(osp->opd_update_thread.t_ctl_waitq,
+ osp_send_update_thread_running(osp) ||
+ osp_send_update_thread_stopped(osp), &lwi);
+
+ RETURN(0);
+}
+
+/**
+ * Finialize osp_update structure in OSP device
+ *
+ * Stop the OSP update sending thread, then delete the left
+ * osp thandle in the sending list.
+ *
+ * \param [in] osp OSP device.
+ */
+static void osp_update_fini(const struct lu_env *env, struct osp_device *osp)
+{
+ struct osp_update_request *our;
+ struct osp_update_request *tmp;
+ struct osp_updates *ou = osp->opd_update;
+
+ if (ou == NULL)
+ return;
+
+ osp->opd_update_thread.t_flags = SVC_STOPPING;
+ wake_up(&ou->ou_waitq);
+
+ wait_event(osp->opd_update_thread.t_ctl_waitq,
+ osp->opd_update_thread.t_flags & SVC_STOPPED);
+
+ /* Remove the left osp thandle from the list */
+ spin_lock(&ou->ou_lock);
+ list_for_each_entry_safe(our, tmp, &ou->ou_list,
+ our_list) {
+ list_del_init(&our->our_list);
+ LASSERT(our->our_th != NULL);
+ osp_trans_callback(env, our->our_th, -EIO);
+ /* our will be destroyed in osp_thandle_put() */
+ osp_thandle_put(our->our_th);
+ }
+ spin_unlock(&ou->ou_lock);
+
+ OBD_FREE_PTR(ou);
+ osp->opd_update = NULL;
+}
+
+/**
* Cleanup OSP, which includes disconnect import, cleanup unlink log, stop
* precreate threads etc.
*
switch (lcfg->lcfg_command) {
case LCFG_PRE_CLEANUP:
rc = osp_disconnect(d);
+ osp_update_fini(env, d);
break;
case LCFG_CLEANUP:
lu_dev_del_linkage(dev->ld_site, dev);
rc = osp_sync_init(env, osp);
if (rc < 0)
GOTO(out_precreat, rc);
+ } else {
+ rc = osp_update_init(osp);
+ if (rc != 0)
+ GOTO(out_fid, rc);
}
ns_register_cancel(obd->obd_namespace, osp_cancel_weight);
/* stop precreate thread */
if (!osp->opd_connect_mdt)
osp_precreate_fini(osp);
+ else
+ osp_update_fini(env, osp);
out_last_used:
if (!osp->opd_connect_mdt)
osp_last_used_fini(env, osp);
ENTRY;
if (osp->opd_async_requests != NULL) {
- dt_update_request_destroy(osp->opd_async_requests);
+ osp_update_request_destroy(osp->opd_async_requests);
osp->opd_async_requests = NULL;
}
int osp_pre_recovering;
};
+/**
+ * Tracking the updates being executed on this dt_device.
+ */
+struct osp_update_request {
+ int our_flags;
+ /* update request result */
+ int our_rc;
+
+ /* Holding object updates sent to the remote target */
+ struct object_update_request *our_req;
+ size_t our_req_size;
+
+ struct list_head our_cb_items;
+
+ /* points to thandle if this update request belongs to one */
+ struct osp_thandle *our_th;
+ /* linked to the list(ou_list) in osp_updates */
+ struct list_head our_list;
+ __u32 our_req_sent:1;
+};
+
+struct osp_updates {
+ struct list_head ou_list;
+ spinlock_t ou_lock;
+ wait_queue_head_t ou_waitq;
+ /* wait for next updates */
+ __u64 ou_rpc_version;
+ __u64 ou_version;
+};
+
struct osp_device {
struct dt_device opd_dt_dev;
/* corresponded OST index */
/* thread waits for signals about pool going empty */
wait_queue_head_t opd_pre_waitq;
+ /* send update thread */
+ struct osp_updates *opd_update;
+ /* dedicate update thread */
+ struct ptlrpc_thread opd_update_thread;
+
/*
* OST synchronization
*/
* osp_device::opd_async_requests via declare() functions, these
* requests can be packed together and sent to the remote server
* via single OUT RPC later. */
- struct dt_update_request *opd_async_requests;
+ struct osp_update_request *opd_async_requests;
/* Protect current operations on opd_async_requests. */
struct mutex opd_async_requests_mutex;
struct list_head opd_async_updates;
struct page **ooi_pages;
};
+#define OSP_THANDLE_MAGIC 0x20141214
struct osp_thandle {
struct thandle ot_super;
- struct dt_update_request *ot_dur;
/* OSP will use this thandle to update last oid*/
struct thandle *ot_storage_th;
- struct list_head ot_dcb_list;
+ __u32 ot_magic;
+ struct list_head ot_commit_dcb_list;
+ struct list_head ot_stop_dcb_list;
+ struct osp_update_request *ot_our;
atomic_t ot_refcount;
+ __u64 ot_version;
};
static inline struct osp_thandle *
return container_of(th, struct osp_thandle, ot_super);
}
-static inline struct dt_update_request *
-thandle_to_dt_update_request(struct thandle *th)
+static inline struct osp_update_request *
+thandle_to_osp_update_request(struct thandle *th)
{
struct osp_thandle *oth;
oth = thandle_to_osp_thandle(th);
- return oth->ot_dur;
+ return oth->ot_our;
}
/* The transaction only include the updates on the remote node, and
unsigned int index);
int osp_extend_update_buffer(const struct lu_env *env,
- struct update_buffer *ubuf);
+ struct osp_update_request *our);
#define osp_update_rpc_pack(env, name, update, op, ...) \
({ \
int ret; \
\
while (1) { \
- ureq = update->dur_buf.ub_req; \
- max_update_length = update->dur_buf.ub_req_size - \
+ ureq = update->our_req; \
+ max_update_length = update->our_req_size - \
object_update_request_size(ureq); \
\
object_update = update_buffer_get_update(ureq, \
- ureq->ourq_count); \
- ret = out_##name##_pack(env, object_update, max_update_length, \
+ ureq->ourq_count); \
+ ret = out_##name##_pack(env, object_update, \
+ max_update_length, \
__VA_ARGS__); \
if (ret == -E2BIG) { \
int rc1; \
/* extend the buffer and retry */ \
- rc1 = osp_extend_update_buffer(env, &update->dur_buf); \
+ rc1 = osp_extend_update_buffer(env, update); \
if (rc1 != 0) { \
ret = rc1; \
break; \
} \
} else { \
if (ret == 0) { \
- object_update->ou_flags |= update->dur_flags; \
+ object_update->ou_flags |= \
+ update->our_flags; \
ureq->ourq_count++; \
} \
break; \
ret; \
})
+static inline bool osp_send_update_thread_running(struct osp_device *osp)
+{
+ return osp->opd_update_thread.t_flags & SVC_RUNNING;
+}
+
+static inline bool osp_send_update_thread_stopped(struct osp_device *osp)
+{
+ return osp->opd_update_thread.t_flags & SVC_STOPPED;
+}
+
typedef int (*osp_update_interpreter_t)(const struct lu_env *env,
struct object_update_reply *rep,
struct ptlrpc_request *req,
int osp_unplug_async_request(const struct lu_env *env,
struct osp_device *osp,
- struct dt_update_request *update);
+ struct osp_update_request *update);
int osp_trans_update_request_create(struct thandle *th);
struct thandle *osp_trans_create(const struct lu_env *env,
struct dt_device *d);
int osp_trans_start(const struct lu_env *env, struct dt_device *dt,
struct thandle *th);
int osp_insert_update_callback(const struct lu_env *env,
- struct dt_update_request *update,
+ struct osp_update_request *update,
struct osp_object *obj, void *data,
osp_update_interpreter_t interpreter);
-int osp_prep_update_req(const struct lu_env *env, struct obd_import *imp,
- const struct object_update_request *ureq,
- struct ptlrpc_request **reqp);
-struct dt_update_request *dt_update_request_create(struct dt_device *dt);
-void dt_update_request_destroy(struct dt_update_request *dt_update);
+
+struct osp_update_request *osp_update_request_create(struct dt_device *dt);
+void osp_update_request_destroy(struct osp_update_request *update);
+
+int osp_send_update_thread(void *arg);
+int osp_check_and_set_rpc_version(struct osp_thandle *oth);
+
+void osp_thandle_destroy(struct osp_thandle *oth);
+static inline void osp_thandle_get(struct osp_thandle *oth)
+{
+ atomic_inc(&oth->ot_refcount);
+}
+
+static inline void osp_thandle_put(struct osp_thandle *oth)
+{
+ if (atomic_dec_and_test(&oth->ot_refcount))
+ osp_thandle_destroy(oth);
+}
int osp_prep_update_req(const struct lu_env *env, struct obd_import *imp,
const struct object_update_request *ureq,
struct ptlrpc_request **reqp);
int osp_remote_sync(const struct lu_env *env, struct osp_device *osp,
- struct dt_update_request *update,
+ struct osp_update_request *update,
struct ptlrpc_request **reqp);
struct thandle *osp_get_storage_thandle(const struct lu_env *env,
struct thandle *th,
struct osp_device *osp);
+void osp_trans_callback(const struct lu_env *env,
+ struct osp_thandle *oth, int rc);
/* osp_object.c */
int osp_attr_get(const struct lu_env *env, struct dt_object *dt,
struct lu_attr *attr);
/**
* Implementation of dt_object_operations::do_declare_create
*
- * Create the dt_update_request to track the update for this OSP
+ * Create the osp_update_request to track the update for this OSP
* in the transaction.
*
* \param[in] env execution environment
}
int osp_extend_update_buffer(const struct lu_env *env,
- struct update_buffer *ubuf)
+ struct osp_update_request *our)
{
- struct object_update_request *ureq;
- size_t new_size = ubuf->ub_req_size + OUT_UPDATE_BUFFER_SIZE_ADD;
+ struct object_update_request *obj_update_req;
+ size_t new_size = our->our_req_size + OUT_UPDATE_BUFFER_SIZE_ADD;
/* enlarge object update request size */
if (new_size > OUT_UPDATE_BUFFER_SIZE_MAX)
return -E2BIG;
- OBD_ALLOC_LARGE(ureq, new_size);
- if (ureq == NULL)
+ OBD_ALLOC_LARGE(obj_update_req, new_size);
+ if (obj_update_req == NULL)
return -ENOMEM;
- memcpy(ureq, ubuf->ub_req, ubuf->ub_req_size);
+ memcpy(obj_update_req, our->our_req, our->our_req_size);
- OBD_FREE_LARGE(ubuf->ub_req, ubuf->ub_req_size);
+ OBD_FREE_LARGE(our->our_req, our->our_req_size);
- ubuf->ub_req = ureq;
- ubuf->ub_req_size = new_size;
+ our->our_req = obj_update_req;
+ our->our_req_size = new_size;
return 0;
}
struct lu_attr *attr, struct dt_allocation_hint *hint,
struct dt_object_format *dof, struct thandle *th)
{
- struct dt_update_request *update;
+ struct osp_update_request *update;
struct osp_object *obj = dt2osp_obj(dt);
int rc;
- update = thandle_to_dt_update_request(th);
+ update = thandle_to_osp_update_request(th);
LASSERT(update != NULL);
LASSERT(attr->la_valid & LA_TYPE);
/**
* Implementation of dt_object_operations::do_declare_ref_del
*
- * Create the dt_update_request to track the update for this OSP
+ * Create the osp_update_request to track the update for this OSP
* in the transaction.
*
* \param[in] env execution environment
static int osp_md_ref_del(const struct lu_env *env, struct dt_object *dt,
struct thandle *th)
{
- struct dt_update_request *update;
+ struct osp_update_request *update;
int rc;
- update = thandle_to_dt_update_request(th);
+ update = thandle_to_osp_update_request(th);
LASSERT(update != NULL);
rc = osp_update_rpc_pack(env, ref_del, update, OUT_REF_DEL,
/**
* Implementation of dt_object_operations::do_declare_ref_del
*
- * Create the dt_update_request to track the update for this OSP
+ * Create the osp_update_request to track the update for this OSP
* in the transaction.
*
* \param[in] env execution environment
static int osp_md_ref_add(const struct lu_env *env, struct dt_object *dt,
struct thandle *th)
{
- struct dt_update_request *update;
+ struct osp_update_request *update;
int rc;
- update = thandle_to_dt_update_request(th);
+ update = thandle_to_osp_update_request(th);
LASSERT(update != NULL);
rc = osp_update_rpc_pack(env, ref_add, update, OUT_REF_ADD,
/**
* Implementation of dt_object_operations::do_declare_attr_get
*
- * Create the dt_update_request to track the update for this OSP
+ * Create the osp_update_request to track the update for this OSP
* in the transaction.
*
* \param[in] env execution environment
int osp_md_attr_set(const struct lu_env *env, struct dt_object *dt,
const struct lu_attr *attr, struct thandle *th)
{
- struct dt_update_request *update;
+ struct osp_update_request *update;
int rc;
- update = thandle_to_dt_update_request(th);
+ update = thandle_to_osp_update_request(th);
LASSERT(update != NULL);
rc = osp_update_rpc_pack(env, attr_set, update, OUT_ATTR_SET,
struct lu_buf *lbuf = &osp_env_info(env)->osi_lb2;
struct osp_device *osp = lu2osp_dev(dt->do_lu.lo_dev);
struct dt_device *dt_dev = &osp->opd_dt_dev;
- struct dt_update_request *update;
+ struct osp_update_request *update;
struct object_update_reply *reply;
struct ptlrpc_request *req = NULL;
struct lu_fid *fid;
* just create an update buffer, instead of attaching the
* update_remote list of the thandle.
*/
- update = dt_update_request_create(dt_dev);
+ update = osp_update_request_create(dt_dev);
if (IS_ERR(update))
RETURN(PTR_ERR(update));
if (req != NULL)
ptlrpc_req_finished(req);
- dt_update_request_destroy(update);
+ osp_update_request_destroy(update);
return rc;
}
/**
* Implementation of dt_index_operations::dio_declare_insert
*
- * Create the dt_update_request to track the update for this OSP
+ * Create the osp_update_request to track the update for this OSP
* in the transaction.
*
* \param[in] env execution environment
struct thandle *th,
int ignore_quota)
{
- struct osp_thandle *oth = thandle_to_osp_thandle(th);
- struct dt_update_request *update = oth->ot_dur;
+ struct osp_update_request *update;
int rc;
+ update = thandle_to_osp_update_request(th);
+ LASSERT(update != NULL);
+
rc = osp_update_rpc_pack(env, index_insert, update, OUT_INDEX_INSERT,
lu_object_fid(&dt->do_lu), rec, key);
return rc;
/**
* Implementation of dt_index_operations::dio_declare_delete
*
- * Create the dt_update_request to track the update for this OSP
+ * Create the osp_update_request to track the update for this OSP
* in the transaction.
*
* \param[in] env execution environment
const struct dt_key *key,
struct thandle *th)
{
- struct dt_update_request *update;
+ struct osp_update_request *update;
int rc;
- update = thandle_to_dt_update_request(th);
+ update = thandle_to_osp_update_request(th);
LASSERT(update != NULL);
rc = osp_update_rpc_pack(env, index_delete, update, OUT_INDEX_DELETE,
{
struct osp_object *o = dt2osp_obj(dt);
struct osp_device *osp = lu2osp_dev(dt->do_lu.lo_dev);
- struct dt_update_request *update;
+ struct osp_update_request *update;
int rc = 0;
ENTRY;
o->opo_non_exist = 1;
LASSERT(osp->opd_connect_mdt);
- update = thandle_to_dt_update_request(th);
+ update = thandle_to_osp_update_request(th);
LASSERT(update != NULL);
rc = osp_update_rpc_pack(env, object_destroy, update, OUT_DESTROY,
/**
* Implementation of dt_body_operations::dbo_declare_write
*
- * Create the dt_update_request to track the update for this OSP
+ * Create the osp_update_request to track the update for this OSP
* in the transaction.
*
* \param[in] env execution environment
const struct lu_buf *buf, loff_t *pos,
struct thandle *th, int ignore_quota)
{
- struct osp_object *obj = dt2osp_obj(dt);
- struct dt_update_request *update;
+ struct osp_object *obj = dt2osp_obj(dt);
+ struct osp_update_request *update;
+ struct osp_thandle *oth = thandle_to_osp_thandle(th);
ssize_t rc;
ENTRY;
- update = thandle_to_dt_update_request(th);
+ update = thandle_to_osp_update_request(th);
LASSERT(update != NULL);
rc = osp_update_rpc_pack(env, write, update, OUT_WRITE,
obj->opo_ooa->ooa_attr.la_size < *pos)
obj->opo_ooa->ooa_attr.la_size = *pos;
+ rc = osp_check_and_set_rpc_version(oth);
+ if (rc < 0)
+ RETURN(rc);
+
RETURN(buf->lb_len);
}
struct osp_device *osp = lu2osp_dev(dt->do_lu.lo_dev);
struct dt_device *dt_dev = &osp->opd_dt_dev;
struct lu_buf *lbuf = &osp_env_info(env)->osi_lb2;
- struct dt_update_request *update;
+ struct osp_update_request *update;
struct object_update_reply *reply;
struct out_read_reply *orr;
struct ptlrpc_request *req = NULL;
/* Because it needs send the update buffer right away,
* just create an update buffer, instead of attaching the
- * update_remote list of the thandle. */
- update = dt_update_request_create(dt_dev);
+ * update_remote list of the thandle. */
+ update = osp_update_request_create(dt_dev);
if (IS_ERR(update))
RETURN(PTR_ERR(update));
if (req != NULL)
ptlrpc_req_finished(req);
- dt_update_request_destroy(update);
+ osp_update_request_destroy(update);
return rc;
}
struct osp_device *osp = lu2osp_dev(dt->do_lu.lo_dev);
struct osp_object *obj = dt2osp_obj(dt);
struct dt_device *dev = &osp->opd_dt_dev;
- struct dt_update_request *update;
+ struct osp_update_request *update;
struct object_update_reply *reply;
struct ptlrpc_request *req = NULL;
int rc = 0;
spin_unlock(&obj->opo_lock);
}
- update = dt_update_request_create(dev);
+ update = osp_update_request_create(dev);
if (IS_ERR(update))
RETURN(PTR_ERR(update));
if (req != NULL)
ptlrpc_req_finished(req);
- dt_update_request_destroy(update);
+ osp_update_request_destroy(update);
return rc;
}
mutex_unlock(&osp->opd_async_requests_mutex);
osp_oac_xattr_put(oxe);
} else {
- struct dt_update_request *update;
+ struct osp_update_request *update;
/* XXX: Currently, we trigger the batched async OUT
* RPC via dt_declare_xattr_get(). It is not
*
* We will improve it in the future. */
update = osp->opd_async_requests;
- if (update != NULL && update->dur_buf.ub_req != NULL &&
- update->dur_buf.ub_req->ourq_count > 0) {
+ if (update != NULL && update->our_req != NULL &&
+ update->our_req->ourq_count > 0) {
osp->opd_async_requests = NULL;
mutex_unlock(&osp->opd_async_requests_mutex);
rc = osp_unplug_async_request(env, osp, update);
struct osp_object *obj = dt2osp_obj(dt);
struct dt_device *dev = &osp->opd_dt_dev;
struct lu_buf *rbuf = &osp_env_info(env)->osi_lb2;
- struct dt_update_request *update = NULL;
+ struct osp_update_request *update = NULL;
struct ptlrpc_request *req = NULL;
struct object_update_reply *reply;
struct osp_xattr_entry *oxe = NULL;
spin_unlock(&obj->opo_lock);
}
- update = dt_update_request_create(dev);
+ update = osp_update_request_create(dev);
if (IS_ERR(update))
GOTO(out, rc = PTR_ERR(update));
ptlrpc_req_finished(req);
if (update != NULL && !IS_ERR(update))
- dt_update_request_destroy(update);
+ osp_update_request_destroy(update);
if (oxe != NULL)
osp_oac_xattr_put(oxe);
struct thandle *th)
{
struct osp_object *o = dt2osp_obj(dt);
- struct dt_update_request *update;
+ struct osp_update_request *update;
struct osp_xattr_entry *oxe;
int rc;
ENTRY;
- update = thandle_to_dt_update_request(th);
+ update = thandle_to_osp_update_request(th);
LASSERT(update != NULL);
CDEBUG(D_INODE, DFID" set xattr '%s' with size %zd\n",
int osp_xattr_del(const struct lu_env *env, struct dt_object *dt,
const char *name, struct thandle *th)
{
- struct dt_update_request *update;
+ struct osp_update_request *update;
const struct lu_fid *fid = lu_object_fid(&dt->do_lu);
- struct osp_object *o = dt2osp_obj(dt);
+ struct osp_object *o = dt2osp_obj(dt);
struct osp_xattr_entry *oxe;
int rc;
- update = thandle_to_dt_update_request(th);
+ update = thandle_to_osp_update_request(th);
LASSERT(update != NULL);
rc = osp_update_rpc_pack(env, xattr_del, update, OUT_XATTR_DEL,
* The argument for the interpreter callback of osp request.
*/
struct osp_update_args {
- struct dt_update_request *oaua_update;
+ struct osp_update_request *oaua_update;
atomic_t *oaua_count;
wait_queue_head_t *oaua_waitq;
bool oaua_flow_control;
* Call back for each update request.
*/
struct osp_update_callback {
- /* list in the dt_update_request::dur_cb_items */
+ /* list in the osp_update_request::our_cb_items */
struct list_head ouc_list;
/* The target of the async update request. */
}
/**
- * Allocate and initialize dt_update_request
+ * Allocate and initialize osp_update_request
*
- * dt_update_request is being used to track updates being executed on
+ * osp_update_request is being used to track updates being executed on
* this dt_device(OSD or OSP). The update buffer will be 4k initially,
* and increased if needed.
*
* \param [in] dt dt device
*
- * \retval dt_update_request being allocated if succeed
+ * \retval osp_update_request being allocated if succeed
* \retval ERR_PTR(errno) if failed
*/
-struct dt_update_request *dt_update_request_create(struct dt_device *dt)
+struct osp_update_request *osp_update_request_create(struct dt_device *dt)
{
- struct dt_update_request *dt_update;
+ struct osp_update_request *osp_update_req;
struct object_update_request *ourq;
- OBD_ALLOC_PTR(dt_update);
- if (dt_update == NULL)
+ OBD_ALLOC_PTR(osp_update_req);
+ if (osp_update_req == NULL)
return ERR_PTR(-ENOMEM);
ourq = object_update_request_alloc(OUT_UPDATE_INIT_BUFFER_SIZE);
if (IS_ERR(ourq)) {
- OBD_FREE_PTR(dt_update);
+ OBD_FREE_PTR(osp_update_req);
return ERR_CAST(ourq);
}
- dt_update->dur_buf.ub_req = ourq;
- dt_update->dur_buf.ub_req_size = OUT_UPDATE_INIT_BUFFER_SIZE;
+ osp_update_req->our_req = ourq;
+ osp_update_req->our_req_size = OUT_UPDATE_INIT_BUFFER_SIZE;
- dt_update->dur_dt = dt;
- dt_update->dur_batchid = 0;
- INIT_LIST_HEAD(&dt_update->dur_cb_items);
+ INIT_LIST_HEAD(&osp_update_req->our_cb_items);
+ INIT_LIST_HEAD(&osp_update_req->our_list);
- return dt_update;
+ return osp_update_req;
}
-/**
- * Destroy dt_update_request
- *
- * \param [in] dt_update dt_update_request being destroyed
- */
-void dt_update_request_destroy(struct dt_update_request *dt_update)
+void osp_update_request_destroy(struct osp_update_request *our)
{
- if (dt_update == NULL)
+ if (our == NULL)
return;
- object_update_request_free(dt_update->dur_buf.ub_req,
- dt_update->dur_buf.ub_req_size);
- OBD_FREE_PTR(dt_update);
+ object_update_request_free(our->our_req,
+ our->our_req_size);
+ OBD_FREE_PTR(our);
}
static void
ourq->ourq_magic, ourq->ourq_count, total_size);
}
+static void osp_trans_stop_cb(struct osp_thandle *oth, int result)
+{
+ struct dt_txn_commit_cb *dcb;
+ struct dt_txn_commit_cb *tmp;
+
+ /* call per-transaction stop callbacks if any */
+ list_for_each_entry_safe(dcb, tmp, &oth->ot_stop_dcb_list,
+ dcb_linkage) {
+ LASSERTF(dcb->dcb_magic == TRANS_COMMIT_CB_MAGIC,
+ "commit callback entry: magic=%x name='%s'\n",
+ dcb->dcb_magic, dcb->dcb_name);
+ list_del_init(&dcb->dcb_linkage);
+ dcb->dcb_func(NULL, &oth->ot_super, dcb, result);
+ }
+}
+
/**
* Allocate an osp request and initialize it with the given parameters.
*
{
struct object_update_reply *reply = NULL;
struct osp_update_args *oaua = arg;
- struct dt_update_request *dt_update = oaua->oaua_update;
+ struct osp_update_request *our = oaua->oaua_update;
+ struct osp_thandle *oth;
struct osp_update_callback *ouc;
struct osp_update_callback *next;
int count = 0;
int index = 0;
int rc1 = 0;
- if (oaua->oaua_flow_control)
- obd_put_request_slot(
- &dt2osp_dev(dt_update->dur_dt)->opd_obd->u.cli);
+ ENTRY;
+
+ if (our == NULL)
+ RETURN(0);
+
+ oaua->oaua_update = NULL;
+ oth = our->our_th;
+ if (oaua->oaua_flow_control) {
+ struct osp_device *osp;
+
+ LASSERT(oth != NULL);
+ osp = dt2osp_dev(oth->ot_super.th_dev);
+ obd_put_request_slot(&osp->opd_obd->u.cli);
+ }
/* Unpack the results from the reply message. */
if (req->rq_repmsg != NULL) {
rc1 = rc;
}
- list_for_each_entry_safe(ouc, next, &dt_update->dur_cb_items,
- ouc_list) {
+ list_for_each_entry_safe(ouc, next, &our->our_cb_items, ouc_list) {
list_del_init(&ouc->ouc_list);
/* The peer may only have handled some requests (indicated
if (oaua->oaua_count != NULL && atomic_dec_and_test(oaua->oaua_count))
wake_up_all(oaua->oaua_waitq);
- dt_update_request_destroy(dt_update);
+ if (oth != NULL) {
+ /* oth and osp_update_requests will be destoryed in
+ * osp_thandle_put */
+ osp_trans_stop_cb(oth, rc);
+ osp_thandle_put(oth);
+ } else {
+ osp_update_request_destroy(our);
+ }
- return 0;
+ RETURN(0);
}
/**
*
* \param[in] env pointer to the thread context
* \param[in] osp pointer to the OSP device
- * \param[in] update pointer to the shared queue
+ * \param[in] our pointer to the shared queue
*
* \retval 0 for success
* \retval negative error number on failure
*/
int osp_unplug_async_request(const struct lu_env *env,
struct osp_device *osp,
- struct dt_update_request *update)
+ struct osp_update_request *our)
{
struct osp_update_args *args;
struct ptlrpc_request *req = NULL;
int rc;
rc = osp_prep_update_req(env, osp->opd_obd->u.cli.cl_import,
- update->dur_buf.ub_req, &req);
+ our->our_req, &req);
if (rc != 0) {
struct osp_update_callback *ouc;
struct osp_update_callback *next;
list_for_each_entry_safe(ouc, next,
- &update->dur_cb_items, ouc_list) {
+ &our->our_cb_items, ouc_list) {
list_del_init(&ouc->ouc_list);
if (ouc->ouc_interpreter != NULL)
ouc->ouc_interpreter(env, NULL, NULL,
ouc->ouc_data, 0, rc);
osp_update_callback_fini(env, ouc);
}
- dt_update_request_destroy(update);
+ osp_update_request_destroy(our);
} else {
args = ptlrpc_req_async_args(req);
- args->oaua_update = update;
+ args->oaua_update = our;
args->oaua_count = NULL;
args->oaua_waitq = NULL;
args->oaua_flow_control = false;
* request queue - osp_device::opd_async_requests.
*
* If the osp_device::opd_async_requests is not NULL, then return it directly;
- * otherwise create new dt_update_request and attach it to opd_async_requests.
+ * otherwise create new osp_update_request and attach it to opd_async_requests.
*
* \param[in] osp pointer to the OSP device
*
* \retval pointer to the shared queue
* \retval negative error number on failure
*/
-static struct dt_update_request *
+static struct osp_update_request *
osp_find_or_create_async_update_request(struct osp_device *osp)
{
- struct dt_update_request *update = osp->opd_async_requests;
+ struct osp_update_request *our = osp->opd_async_requests;
+
+ if (our != NULL)
+ return our;
- if (update != NULL)
- return update;
+ our = osp_update_request_create(&osp->opd_dt_dev);
+ if (IS_ERR(our))
+ return our;
- update = dt_update_request_create(&osp->opd_dt_dev);
- if (!IS_ERR(update))
- osp->opd_async_requests = update;
+ osp->opd_async_requests = our;
- return update;
+ return our;
}
/**
- * Insert an osp_update_callback into the dt_update_request.
+ * Insert an osp_update_callback into the osp_update_request.
*
- * Insert an osp_update_callback to the dt_update_request. Usually each update
- * in the dt_update_request will have one correspondent callback, and these
+ * Insert an osp_update_callback to the osp_update_request. Usually each update
+ * in the osp_update_request will have one correspondent callback, and these
* callbacks will be called in rq_interpret_reply.
*
* \param[in] env pointer to the thread context
* \retval negative error number on failure
*/
int osp_insert_update_callback(const struct lu_env *env,
- struct dt_update_request *update,
+ struct osp_update_request *our,
struct osp_object *obj, void *data,
osp_update_interpreter_t interpreter)
{
if (ouc == NULL)
RETURN(-ENOMEM);
- list_add_tail(&ouc->ouc_list, &update->dur_cb_items);
+ list_add_tail(&ouc->ouc_list, &our->our_cb_items);
return 0;
}
__u16 *lens, const void **bufs, void *data,
osp_update_interpreter_t interpreter)
{
- struct osp_device *osp = lu2osp_dev(osp2lu_obj(obj)->lo_dev);
- struct dt_update_request *update;
+ struct osp_device *osp;
+ struct osp_update_request *our;
struct object_update *object_update;
size_t max_update_size;
struct object_update_request *ureq;
int rc = 0;
ENTRY;
- update = osp_find_or_create_async_update_request(osp);
- if (IS_ERR(update))
- RETURN(PTR_ERR(update));
+ osp = lu2osp_dev(osp2lu_obj(obj)->lo_dev);
+ our = osp_find_or_create_async_update_request(osp);
+ if (IS_ERR(our))
+ RETURN(PTR_ERR(our));
again:
- ureq = update->dur_buf.ub_req;
- max_update_size = update->dur_buf.ub_req_size -
- object_update_request_size(ureq);
+ ureq = our->our_req;
+ max_update_size = our->our_req_size - object_update_request_size(ureq);
object_update = update_buffer_get_update(ureq, ureq->ourq_count);
rc = out_update_pack(env, object_update, max_update_size, op,
osp->opd_async_requests = NULL;
mutex_unlock(&osp->opd_async_requests_mutex);
- rc = osp_unplug_async_request(env, osp, update);
+ rc = osp_unplug_async_request(env, osp, our);
mutex_lock(&osp->opd_async_requests_mutex);
if (rc != 0)
RETURN(rc);
- update = osp_find_or_create_async_update_request(osp);
- if (IS_ERR(update))
- RETURN(PTR_ERR(update));
+ our = osp_find_or_create_async_update_request(osp);
+ if (IS_ERR(our))
+ RETURN(PTR_ERR(our));
goto again;
} else {
ureq->ourq_count++;
}
- rc = osp_insert_update_callback(env, update, obj, data, interpreter);
+ rc = osp_insert_update_callback(env, our, obj, data, interpreter);
RETURN(rc);
}
int osp_trans_update_request_create(struct thandle *th)
{
struct osp_thandle *oth = thandle_to_osp_thandle(th);
- struct dt_update_request *update;
+ struct osp_update_request *our;
- if (oth->ot_dur != NULL)
+ if (oth->ot_our != NULL)
return 0;
- update = dt_update_request_create(th->th_dev);
- if (IS_ERR(update)) {
- th->th_result = PTR_ERR(update);
- return PTR_ERR(update);
+ our = osp_update_request_create(th->th_dev);
+ if (IS_ERR(our)) {
+ th->th_result = PTR_ERR(our);
+ return PTR_ERR(our);
}
if (dt2osp_dev(th->th_dev)->opd_connect_mdt)
- update->dur_flags = UPDATE_FL_SYNC;
+ our->our_flags = UPDATE_FL_SYNC;
- oth->ot_dur = update;
+ oth->ot_our = our;
+ our->our_th = oth;
return 0;
}
-static void osp_thandle_get(struct osp_thandle *oth)
-{
- atomic_inc(&oth->ot_refcount);
-}
-
-static void osp_thandle_put(struct osp_thandle *oth)
+void osp_thandle_destroy(struct osp_thandle *oth)
{
- if (atomic_dec_and_test(&oth->ot_refcount))
- OBD_FREE_PTR(oth);
+ LASSERT(oth->ot_magic == OSP_THANDLE_MAGIC);
+ LASSERT(list_empty(&oth->ot_commit_dcb_list));
+ LASSERT(list_empty(&oth->ot_stop_dcb_list));
+ if (oth->ot_our != NULL)
+ osp_update_request_destroy(oth->ot_our);
+ OBD_FREE_PTR(oth);
}
/**
if (unlikely(oth == NULL))
RETURN(ERR_PTR(-ENOMEM));
+ oth->ot_magic = OSP_THANDLE_MAGIC;
th = &oth->ot_super;
th->th_dev = d;
th->th_tags = LCT_TX_HANDLE;
atomic_set(&oth->ot_refcount, 1);
- INIT_LIST_HEAD(&oth->ot_dcb_list);
+ INIT_LIST_HEAD(&oth->ot_commit_dcb_list);
+ INIT_LIST_HEAD(&oth->ot_stop_dcb_list);
RETURN(th);
}
* Prepare OUT update ptlrpc request, and the request usually includes
* all of updates (stored in \param ureq) from one operation.
*
- * \param[in] env execution environment
- * \param[in] imp import on which ptlrpc request will be sent
- * \param[in] ureq hold all of updates which will be packed into the req
- * \param[in] reqp request to be created
+ * \param[in] env execution environment
+ * \param[in] imp import on which ptlrpc request will be sent
+ * \param[in] ureq hold all of updates which will be packed into the req
+ * \param[in] reqp request to be created
*
- * \retval 0 if preparation succeeds.
- * \retval negative errno if preparation fails.
+ * \retval 0 if preparation succeeds.
+ * \retval negative errno if preparation fails.
*/
int osp_prep_update_req(const struct lu_env *env, struct obd_import *imp,
const struct object_update_request *ureq,
struct ptlrpc_request **reqp)
{
- struct ptlrpc_request *req;
- struct object_update_request *tmp;
- int ureq_len;
- int rc;
+ struct ptlrpc_request *req;
+ struct object_update_request *tmp;
+ size_t ureq_len;
+ int rc;
ENTRY;
object_update_request_dump(ureq, D_INFO);
}
req_capsule_set_size(&req->rq_pill, &RMF_OUT_UPDATE_REPLY,
- RCL_SERVER, OUT_UPDATE_REPLY_SIZE);
+ RCL_SERVER, OUT_UPDATE_REPLY_SIZE);
tmp = req_capsule_client_get(&req->rq_pill, &RMF_OUT_UPDATE);
memcpy(tmp, ureq, ureq_len);
}
/**
-* Send update RPC.
-*
-* Send update request to the remote MDT synchronously.
-*
-* \param[in] env execution environment
-* \param[in] imp import on which ptlrpc request will be sent
-* \param[in] dt_update hold all of updates which will be packed into the req
-* \param[in] reqp request to be created
-*
-* \retval 0 if RPC succeeds.
-* \retval negative errno if RPC fails.
-*/
+ * Send update RPC.
+ *
+ * Send update request to the remote MDT synchronously.
+ *
+ * \param[in] env execution environment
+ * \param[in] imp import on which ptlrpc request will be sent
+ * \param[in] our hold all of updates which will be packed into the req
+ * \param[in] reqp request to be created
+ *
+ * \retval 0 if RPC succeeds.
+ * \retval negative errno if RPC fails.
+ */
+
int osp_remote_sync(const struct lu_env *env, struct osp_device *osp,
- struct dt_update_request *dt_update,
+ struct osp_update_request *our,
struct ptlrpc_request **reqp)
{
struct obd_import *imp = osp->opd_obd->u.cli.cl_import;
int rc;
ENTRY;
- rc = osp_prep_update_req(env, imp, dt_update->dur_buf.ub_req, &req);
+ rc = osp_prep_update_req(env, imp, our->our_req, &req);
if (rc != 0)
RETURN(rc);
rc = ptlrpc_queue_wait(req);
if (rc < 0) {
ptlrpc_req_finished(req);
- dt_update->dur_rc = rc;
+ our->our_rc = rc;
RETURN(rc);
}
RETURN(rc);
}
- dt_update->dur_rc = rc;
+ our->our_rc = rc;
ptlrpc_req_finished(req);
LASSERT(dcb->dcb_magic == TRANS_COMMIT_CB_MAGIC);
LASSERT(&dcb->dcb_func != NULL);
- list_add(&dcb->dcb_linkage, &oth->ot_dcb_list);
-
+ if (dcb->dcb_flags & DCB_TRANS_STOP)
+ list_add(&dcb->dcb_linkage, &oth->ot_stop_dcb_list);
+ else
+ list_add(&dcb->dcb_linkage, &oth->ot_commit_dcb_list);
return 0;
}
LASSERT(atomic_read(&oth->ot_refcount) > 0);
/* call per-transaction callbacks if any */
- list_for_each_entry_safe(dcb, tmp, &oth->ot_dcb_list,
+ list_for_each_entry_safe(dcb, tmp, &oth->ot_commit_dcb_list,
dcb_linkage) {
LASSERTF(dcb->dcb_magic == TRANS_COMMIT_CB_MAGIC,
"commit callback entry: magic=%x name='%s'\n",
static void osp_request_commit_cb(struct ptlrpc_request *req)
{
- struct thandle *th = req->rq_cb_data;
- struct osp_thandle *oth = thandle_to_osp_thandle(th);
+ struct thandle *th = req->rq_cb_data;
+ struct osp_thandle *oth;
__u64 last_committed_transno = 0;
int result = req->rq_status;
ENTRY;
+ if (th == NULL)
+ RETURN_EXIT;
+
+ oth = thandle_to_osp_thandle(th);
if (lustre_msg_get_last_committed(req->rq_repmsg))
last_committed_transno =
lustre_msg_get_last_committed(req->rq_repmsg);
}
/**
- * Trigger the request for remote updates.
+ * callback of osp transaction
*
- * If th_sync is set, then the request will be sent synchronously,
- * otherwise, the RPC will be sent asynchronously.
+ * Call all of callbacks for this osp thandle. This will only be
+ * called in error handler path. In the normal processing path,
+ * these callback will be called in osp_request_commit_cb() and
+ * osp_update_interpret().
+ *
+ * \param [in] env execution environment
+ * \param [in] oth osp thandle
+ * \param [in] rc result of the osp thandle
+ */
+void osp_trans_callback(const struct lu_env *env,
+ struct osp_thandle *oth, int rc)
+{
+ struct osp_update_callback *ouc;
+ struct osp_update_callback *next;
+
+ if (oth->ot_our != NULL) {
+ list_for_each_entry_safe(ouc, next,
+ &oth->ot_our->our_cb_items, ouc_list) {
+ list_del_init(&ouc->ouc_list);
+ if (ouc->ouc_interpreter != NULL)
+ ouc->ouc_interpreter(env, NULL, NULL,
+ ouc->ouc_obj,
+ ouc->ouc_data, 0, rc);
+ osp_update_callback_fini(env, ouc);
+ }
+ }
+ osp_trans_stop_cb(oth, rc);
+ osp_trans_commit_cb(oth, rc);
+}
+
+/**
+ * Send the request for remote updates.
+ *
+ * Send updates to the remote MDT. Prepare the request by osp_update_req
+ * and send them to remote MDT, for sync request, it will wait
+ * until the reply return, otherwise hand it to ptlrpcd.
*
* Please refer to osp_trans_create() for transaction type.
*
* \param[in] env pointer to the thread context
* \param[in] osp pointer to the OSP device
- * \param[in] dt_update pointer to the dt_update_request
- * \param[in] th pointer to the transaction handler
- * \param[out] sent whether the RPC has been sent
+ * \param[in] our pointer to the osp_update_request
*
* \retval 0 for success
* \retval negative error number on failure
*/
-static int osp_trans_trigger(const struct lu_env *env, struct osp_device *osp,
- struct dt_update_request *dt_update,
- struct thandle *th, int *sent)
+static int osp_send_update_req(const struct lu_env *env,
+ struct osp_device *osp,
+ struct osp_update_request *our)
{
struct osp_update_args *args;
struct ptlrpc_request *req;
+ struct lu_device *top_device;
+ struct osp_thandle *oth = our->our_th;
int rc = 0;
ENTRY;
+ LASSERT(oth != NULL);
+ LASSERT(our->our_req_sent == 0);
rc = osp_prep_update_req(env, osp->opd_obd->u.cli.cl_import,
- dt_update->dur_buf.ub_req, &req);
- if (rc != 0)
+ our->our_req, &req);
+ if (rc != 0) {
+ osp_trans_callback(env, oth, rc);
RETURN(rc);
+ }
- *sent = 1;
- req->rq_interpret_reply = osp_update_interpret;
args = ptlrpc_req_async_args(req);
- args->oaua_update = dt_update;
+ args->oaua_update = our;
+ osp_thandle_get(oth); /* hold for update interpret */
+ req->rq_interpret_reply = osp_update_interpret;
+ if (!oth->ot_super.th_wait_submit && !oth->ot_super.th_sync) {
+ if (!osp->opd_imp_active || !osp->opd_imp_connected) {
+ osp_trans_callback(env, oth, rc);
+ osp_thandle_put(oth);
+ GOTO(out, rc = -ENOTCONN);
+ }
- if (is_only_remote_trans(th) && !th->th_sync &&
- !th->th_wait_submit) {
+ rc = obd_get_request_slot(&osp->opd_obd->u.cli);
+ if (rc != 0) {
+ osp_trans_callback(env, oth, rc);
+ osp_thandle_put(oth);
+ GOTO(out, rc = -ENOTCONN);
+ }
args->oaua_flow_control = true;
if (!osp->opd_connect_mdt) {
}
ptlrpcd_add_req(req, PDL_POLICY_LOCAL, -1);
+ req = NULL;
} else {
- struct osp_thandle *oth = thandle_to_osp_thandle(th);
- struct lu_device *top_device;
+ osp_thandle_get(oth); /* hold for commit callback */
+ req->rq_commit_cb = osp_request_commit_cb;
+ req->rq_cb_data = &oth->ot_super;
+ args->oaua_flow_control = false;
/* If the transaction is created during MDT recoverying
* process, it means this is an recovery update, we need
if (top_device->ld_obd->obd_recovering)
req->rq_allow_replay = 1;
- args->oaua_flow_control = false;
- req->rq_commit_cb = osp_request_commit_cb;
- req->rq_cb_data = th;
- osp_thandle_get(oth); /* for commit callback */
osp_get_rpc_lock(osp);
rc = ptlrpc_queue_wait(req);
osp_put_rpc_lock(osp);
- if (req->rq_transno == 0 && !req->rq_committed)
+ if ((rc == -ENOMEM && req->rq_set == NULL) ||
+ (req->rq_transno == 0 && !req->rq_committed)) {
+ if (args->oaua_update != NULL) {
+ /* If osp_update_interpret is not being called,
+ * release the osp_thandle */
+ args->oaua_update = NULL;
+ osp_thandle_put(oth);
+ }
+
+ req->rq_cb_data = NULL;
+ rc = rc == 0 ? req->rq_status : rc;
+ osp_trans_callback(env, oth, rc);
osp_thandle_put(oth);
- else
- oth->ot_dur = NULL;
- ptlrpc_req_finished(req);
+ GOTO(out, rc);
+ }
}
+out:
+ if (req != NULL)
+ ptlrpc_req_finished(req);
RETURN(rc);
}
}
/**
+ * Set version for the transaction
+ *
+ * Set the version for the transaction, then the osp RPC will be
+ * sent in the order of version, i.e. the transaction with lower
+ * version will be sent first.
+ *
+ * \param [in] oth osp thandle to be set version.
+ *
+ * \retval 0 if set version succeeds
+ * negative errno if set version fails.
+ */
+int osp_check_and_set_rpc_version(struct osp_thandle *oth)
+{
+ struct osp_device *osp = dt2osp_dev(oth->ot_super.th_dev);
+ struct osp_updates *ou = osp->opd_update;
+
+ if (ou == NULL)
+ return -EIO;
+
+ if (oth->ot_version != 0)
+ return 0;
+
+ spin_lock(&ou->ou_lock);
+ oth->ot_version = ou->ou_version++;
+ spin_unlock(&ou->ou_lock);
+
+ CDEBUG(D_INFO, "%s: version "LPU64" oth:version %p:"LPU64"\n",
+ osp->opd_obd->obd_name, ou->ou_version, oth, oth->ot_version);
+
+ return 0;
+}
+
+/**
+ * Get next OSP update request in the sending list
+ * Get next OSP update request in the sending list by version number, next
+ * request will be
+ * 1. transaction which does not have a version number.
+ * 2. transaction whose version == opd_rpc_version.
+ *
+ * \param [in] ou osp update structure.
+ * \param [out] ourp the pointer holding the next update request.
+ *
+ * \retval true if getting the next transaction.
+ * \retval false if not getting the next transaction.
+ */
+static bool
+osp_get_next_request(struct osp_updates *ou, struct osp_update_request **ourp)
+{
+ struct osp_update_request *our;
+ struct osp_update_request *tmp;
+ bool got_req = false;
+
+ spin_lock(&ou->ou_lock);
+ list_for_each_entry_safe(our, tmp, &ou->ou_list, our_list) {
+ LASSERT(our->our_th != NULL);
+ CDEBUG(D_INFO, "our %p version "LPU64" rpc_version "LPU64"\n",
+ our, our->our_th->ot_version, ou->ou_rpc_version);
+ if (our->our_th->ot_version == 0) {
+ list_del_init(&our->our_list);
+ *ourp = our;
+ got_req = true;
+ break;
+ }
+
+ /* Find next osp_update_request in the list */
+ if (our->our_th->ot_version == ou->ou_rpc_version) {
+ list_del_init(&our->our_list);
+ *ourp = our;
+ got_req = true;
+ break;
+ }
+ }
+ spin_unlock(&ou->ou_lock);
+
+ return got_req;
+}
+
+static void osp_update_rpc_version(struct osp_updates *ou,
+ struct osp_thandle *oth)
+{
+ if (oth->ot_version == 0)
+ return;
+
+ LASSERT(oth->ot_version == ou->ou_rpc_version);
+ spin_lock(&ou->ou_lock);
+ ou->ou_rpc_version++;
+ spin_unlock(&ou->ou_lock);
+}
+
+/**
+ * Sending update thread
+ *
+ * Create thread to send update request to other MDTs, this thread will pull
+ * out update request from the list in OSP by version number, i.e. it will
+ * make sure the update request with lower version number will be sent first.
+ *
+ * \param[in] arg hold the OSP device.
+ *
+ * \retval 0 if the thread is created successfully.
+ * \retal negative error if the thread is not created
+ * successfully.
+ */
+int osp_send_update_thread(void *arg)
+{
+ struct lu_env env;
+ struct osp_device *osp = arg;
+ struct l_wait_info lwi = { 0 };
+ struct osp_updates *ou = osp->opd_update;
+ struct ptlrpc_thread *thread = &osp->opd_update_thread;
+ struct osp_update_request *our = NULL;
+ int rc;
+ ENTRY;
+
+ LASSERT(ou != NULL);
+ rc = lu_env_init(&env, osp->opd_dt_dev.dd_lu_dev.ld_type->ldt_ctx_tags);
+ if (rc < 0) {
+ CERROR("%s: init env error: rc = %d\n", osp->opd_obd->obd_name,
+ rc);
+ RETURN(rc);
+ }
+
+ thread->t_flags = SVC_RUNNING;
+ wake_up(&thread->t_ctl_waitq);
+ while (1) {
+ our = NULL;
+ l_wait_event(ou->ou_waitq,
+ !osp_send_update_thread_running(osp) ||
+ osp_get_next_request(ou, &our),
+ &lwi);
+
+ if (!osp_send_update_thread_running(osp)) {
+ if (our != NULL && our->our_th != NULL) {
+ osp_trans_callback(&env, our->our_th, -EINTR);
+ osp_thandle_put(our->our_th);
+ }
+ break;
+ }
+
+ if (our->our_req_sent == 0) {
+ if (our->our_th != NULL &&
+ our->our_th->ot_super.th_result != 0)
+ osp_trans_callback(&env, our->our_th,
+ our->our_th->ot_super.th_result);
+ else
+ rc = osp_send_update_req(&env, osp, our);
+ }
+
+ if (our->our_th != NULL) {
+ /* Update the rpc version */
+ osp_update_rpc_version(ou, our->our_th);
+ /* Balanced for thandle_get in osp_trans_trigger() */
+ osp_thandle_put(our->our_th);
+ }
+ }
+
+ thread->t_flags = SVC_STOPPED;
+ lu_env_fini(&env);
+ wake_up(&thread->t_ctl_waitq);
+
+ RETURN(0);
+}
+
+/**
+ * Trigger the request for remote updates.
+ *
+ * Add the request to the sending list, and wake up osp update
+ * sending thread.
+ *
+ * \param[in] env pointer to the thread context
+ * \param[in] osp pointer to the OSP device
+ * \param[in] oth pointer to the transaction handler
+ *
+ */
+static void osp_trans_trigger(const struct lu_env *env,
+ struct osp_device *osp,
+ struct osp_thandle *oth)
+{
+
+ CDEBUG(D_INFO, "%s: add oth %p with version "LPU64"\n",
+ osp->opd_obd->obd_name, oth, oth->ot_version);
+
+ LASSERT(oth->ot_magic == OSP_THANDLE_MAGIC);
+ osp_thandle_get(oth);
+ LASSERT(oth->ot_our != NULL);
+ spin_lock(&osp->opd_update->ou_lock);
+ list_add_tail(&oth->ot_our->our_list,
+ &osp->opd_update->ou_list);
+ spin_unlock(&osp->opd_update->ou_lock);
+
+ wake_up(&osp->opd_update->ou_waitq);
+}
+
+/**
* The OSP layer dt_device_operations::dt_trans_start() interface
* to start the transaction.
*
struct thandle *th)
{
struct osp_thandle *oth = thandle_to_osp_thandle(th);
- struct dt_update_request *dt_update;
+ struct osp_update_request *our = oth->ot_our;
+ struct osp_device *osp = dt2osp_dev(dt);
int rc = 0;
- int sent = 0;
ENTRY;
/* For remote transaction, if there is local storage thandle,
oth->ot_storage_th = NULL;
}
- dt_update = oth->ot_dur;
- if (dt_update == NULL || th->th_result != 0) {
- rc = th->th_result;
- GOTO(out, rc);
+ if (our == NULL || our->our_req == NULL ||
+ our->our_req->ourq_count == 0) {
+ osp_trans_callback(env, oth, th->th_result);
+ GOTO(out, rc = th->th_result);
}
- LASSERT(dt_update != LP_POISON);
-
- /* If there are no updates, destroy dt_update and thandle */
- if (dt_update->dur_buf.ub_req == NULL ||
- dt_update->dur_buf.ub_req->ourq_count == 0)
+ if (!osp->opd_connect_mdt) {
+ rc = osp_send_update_req(env, osp, oth->ot_our);
GOTO(out, rc);
-
- if (is_only_remote_trans(th) && !th->th_sync &&
- !th->th_wait_submit) {
- struct osp_device *osp = dt2osp_dev(th->th_dev);
- struct client_obd *cli = &osp->opd_obd->u.cli;
-
- rc = obd_get_request_slot(cli);
- if (rc != 0)
- GOTO(out, rc);
-
- if (!osp->opd_imp_active || !osp->opd_imp_connected) {
- obd_put_request_slot(cli);
- GOTO(out, rc = -ENOTCONN);
- }
-
- rc = osp_trans_trigger(env, dt2osp_dev(dt),
- dt_update, th, &sent);
- if (rc != 0)
- obd_put_request_slot(cli);
- } else {
- rc = osp_trans_trigger(env, dt2osp_dev(dt), dt_update,
- th, &sent);
}
-out:
- /* If RPC is triggered successfully, dt_update will be freed in
- * osp_update_interpreter() */
- if (sent == 0) {
- struct osp_update_callback *ouc;
- struct osp_update_callback *next;
+ if (osp->opd_update == NULL ||
+ !osp_send_update_thread_running(osp)) {
+ osp_trans_callback(env, oth, -EIO);
+ GOTO(out, rc = -EIO);
+ }
- if (dt_update != NULL) {
- list_for_each_entry_safe(ouc, next,
- &dt_update->dur_cb_items,
- ouc_list) {
- list_del_init(&ouc->ouc_list);
- if (ouc->ouc_interpreter != NULL)
- ouc->ouc_interpreter(env, NULL, NULL,
- ouc->ouc_obj,
- ouc->ouc_data, 0,
- rc);
- osp_update_callback_fini(env, ouc);
- }
- }
- osp_trans_commit_cb(oth, rc);
- dt_update_request_destroy(dt_update);
- oth->ot_dur = NULL;
+ if (th->th_sync) {
+ /* if th_sync is set, then it needs to be sent
+ * right away. Note: even thought the RPC has been
+ * sent, it still needs to be added to the sending
+ * list (see osp_trans_trigger()), so ou_rpc_version
+ * can be updated correctly. */
+ rc = osp_send_update_req(env, osp, our);
+ our->our_req_sent = 1;
}
+ osp_trans_trigger(env, osp, oth);
+out:
osp_thandle_put(oth);
RETURN(rc);
lbuf->lb_len = buf_len;
tmp = object_update_param_get(update, 2, &size);
- if (tmp == NULL || size != sizeof(*tmp)) {
+ if (tmp == NULL || IS_ERR(tmp) || size != sizeof(*tmp)) {
CERROR("%s: emptry or wrong size %zu flag: rc = %d\n",
tgt_name(tsi->tsi_tgt), size, -EPROTO);
RETURN(err_serious(-EPROTO));
GOTO(out, rc = -ENOENT);
tmp = object_update_param_get(update, 0, NULL);
- if (tmp == NULL) {
+ if (tmp == NULL || IS_ERR(tmp)) {
CERROR("%s: empty size for read: rc = %d\n",
tgt_name(tsi->tsi_tgt), -EPROTO);
GOTO(out, rc = err_serious(-EPROTO));
size = le64_to_cpu(*(size_t *)(tmp));
tmp = object_update_param_get(update, 1, NULL);
- if (tmp == NULL) {
+ if (tmp == NULL || IS_ERR(tmp)) {
CERROR("%s: empty pos for read: rc = %d\n",
tgt_name(tsi->tsi_tgt), -EPROTO);
GOTO(out, rc = err_serious(-EPROTO));
}
if (rc == -EEXIST) {
- dtrq_destory(dtrq);
+ dtrq_destroy(dtrq);
rc = 0;
goto again;
}
*
* \param[in] dtrq distribute txn replqy req to be destroyed.
*/
-void dtrq_destory(struct distribute_txn_replay_req *dtrq)
+void dtrq_destroy(struct distribute_txn_replay_req *dtrq)
{
struct distribute_txn_replay_req_sub *dtrqs;
struct distribute_txn_replay_req_sub *tmp;
OBD_FREE_PTR(dtrq);
}
-EXPORT_SYMBOL(dtrq_destory);
+EXPORT_SYMBOL(dtrq_destroy);
/**
* Destroy all of replay req.
list_for_each_entry_safe(dtrq, tmp, &tdtd->tdtd_replay_list,
dtrq_list) {
list_del_init(&dtrq->dtrq_list);
- dtrq_destory(dtrq);
+ dtrq_destroy(dtrq);
}
spin_unlock(&tdtd->tdtd_replay_list_lock);
}
struct top_thandle *top_th;
struct top_multiple_thandle *tmt;
struct thandle_update_records *tur = NULL;
- unsigned int i;
+ int i;
int rc = 0;
ENTRY;
CDEBUG(D_HA, "error during execution of #%u from"
" %s:%d: rc = %d\n", i, ta->ta_args[i]->file,
ta->ta_args[i]->line, rc);
- while (--i >= 0) {
+ while (--i > 0) {
if (ta->ta_args[i]->undo_fn != NULL) {
dt_obj = ta->ta_args[i]->object;
sub_dt =
* Mark the sub thandle to be committed and if all sub thandle are committed
* notify the top thandle.
*
- * \param[in] sub_th sub thandle being committed.
+ * \param[in] env execution environment
+ * \param[in] sub_th sub thandle being committed
+ * \param[in] cb commit callback
+ * \param[in] err trans result
*/
static void sub_trans_commit_cb(struct lu_env *env,
struct thandle *sub_th,
}
/**
+ * Sub thandle stop call back
+ *
+ * After sub thandle is stopped, it will call this callback to notify
+ * the top thandle.
+ *
+ * \param[in] th sub thandle to be stopped
+ * \param[in] rc result of sub trans
+ */
+static void sub_trans_stop_cb(struct lu_env *env,
+ struct thandle *sub_th,
+ struct dt_txn_commit_cb *cb, int err)
+{
+ struct sub_thandle *st;
+ struct top_multiple_thandle *tmt = cb->dcb_data;
+ ENTRY;
+
+ list_for_each_entry(st, &tmt->tmt_sub_thandle_list, st_sub_list) {
+ if (st->st_stopped)
+ continue;
+
+ if (st->st_dt == sub_th->th_dev) {
+ st->st_stopped = 1;
+ st->st_result = err;
+ break;
+ }
+ }
+
+ wake_up(&tmt->tmt_stop_waitq);
+ RETURN_EXIT;
+}
+
+static void sub_thandle_register_stop_cb(struct sub_thandle *st,
+ struct top_multiple_thandle *tmt)
+{
+ st->st_stop_dcb.dcb_func = sub_trans_stop_cb;
+ st->st_stop_dcb.dcb_data = tmt;
+ st->st_stop_dcb.dcb_flags = DCB_TRANS_STOP;
+ INIT_LIST_HEAD(&st->st_stop_dcb.dcb_linkage);
+ dt_trans_cb_add(st->st_sub_th, &st->st_stop_dcb);
+}
+
+/**
* Create sub thandle
*
* Create transaction handle for sub_thandle
return 0;
}
-
/**
* Create the top transaction.
*
if (rc != 0)
GOTO(out, rc);
+ sub_thandle_register_stop_cb(st, tmt);
sub_thandle_register_commit_cb(st, tmt);
}
out:
}
/**
+ * Check if top transaction is stopped
+ *
+ * Check if top transaction is stopped, only if all sub transaction
+ * is stopped, then the top transaction is stopped.
+ *
+ * \param [in] top_th top thandle
+ *
+ * \retval true if the top transaction is stopped.
+ * \retval false if the top transaction is not stopped.
+ */
+static bool top_trans_is_stopped(struct top_thandle *top_th)
+{
+ struct top_multiple_thandle *tmt;
+ struct sub_thandle *st;
+ bool all_stopped = true;
+
+ tmt = top_th->tt_multiple_thandle;
+ list_for_each_entry(st, &tmt->tmt_sub_thandle_list, st_sub_list) {
+ if (!st->st_stopped && st->st_sub_th != NULL) {
+ all_stopped = false;
+ break;
+ }
+
+ if (st->st_result != 0 &&
+ top_th->tt_super.th_result == 0)
+ top_th->tt_super.th_result = st->st_result;
+ }
+
+ return all_stopped;
+}
+
+/**
+ * Wait result of top transaction
+ *
+ * Wait until all sub transaction get its result.
+ *
+ * \param [in] top_th top thandle.
+ *
+ * \retval the result of top thandle.
+ */
+static int top_trans_wait_result(struct top_thandle *top_th)
+{
+ struct l_wait_info lwi = {0};
+
+ l_wait_event(top_th->tt_multiple_thandle->tmt_stop_waitq,
+ top_trans_is_stopped(top_th), &lwi);
+
+ RETURN(top_th->tt_super.th_result);
+}
+
+/**
* Stop the top transaction.
*
* Stop the transaction on the master device first, then stop transactions
th->th_result = rc;
}
+ rc = top_trans_wait_result(top_th);
+
tmt->tmt_result = rc;
+
/* Balance for the refcount in top_trans_create, Note: if it is NOT
* multiple node transaction, the top transaction will be destroyed. */
top_multiple_thandle_put(tmt);
INIT_LIST_HEAD(&tmt->tmt_commit_list);
atomic_set(&tmt->tmt_refcount, 1);
+ init_waitqueue_head(&tmt->tmt_stop_waitq);
top_th->tt_multiple_thandle = tmt;
return 0;
return st;
st->st_sub_th = sub_th;
+
sub_th->th_top = &top_th->tt_super;
return st;
}
if (rc < 0)
GOTO(stop, rc);
- dt_trans_cb_add(th, &dtbd->dtbd_cb);
+ rc = dt_trans_cb_add(th, &dtbd->dtbd_cb);
if (rc < 0)
GOTO(stop, rc);
dt_obj = dt_find_or_create(env, lut->lut_bottom, fid, dof,
attr);
- if (IS_ERR(dt_obj))
- GOTO(out_put, rc = PTR_ERR(dt_obj));
+ if (IS_ERR(dt_obj)) {
+ rc = PTR_ERR(dt_obj);
+ dt_obj = NULL;
+ GOTO(out_put, rc);
+ }
tdtd->tdtd_batchid_obj = dt_obj;