struct object_update ourq_updates[0];
};
+#define OUT_UPDATE_HEADER_MAGIC 0xBDDF0001
+/* Header for updates request between MDTs */
+struct out_update_header {
+ __u32 ouh_magic;
+ __u32 ouh_count;
+};
+
+struct out_update_buffer {
+ __u32 oub_size;
+ __u32 oub_padding;
+};
+
void lustre_swab_object_update(struct object_update *ou);
void lustre_swab_object_update_request(struct object_update_request *our);
+void lustre_swab_out_update_header(struct out_update_header *ouh);
+void lustre_swab_out_update_buffer(struct out_update_buffer *oub);
static inline size_t
object_update_params_size(const struct object_update *update)
extern const struct ptlrpc_bulk_frag_ops ptlrpc_bulk_kiov_pin_ops;
extern const struct ptlrpc_bulk_frag_ops ptlrpc_bulk_kiov_nopin_ops;
+extern const struct ptlrpc_bulk_frag_ops ptlrpc_bulk_kvec_ops;
/*
* Definition of bulk descriptor.
/* OBJ update format */
extern struct req_msg_field RMF_OUT_UPDATE;
extern struct req_msg_field RMF_OUT_UPDATE_REPLY;
+extern struct req_msg_field RMF_OUT_UPDATE_HEADER;
+extern struct req_msg_field RMF_OUT_UPDATE_BUF;
/* LFSCK format */
extern struct req_msg_field RMF_LFSCK_REQUEST;
}
static inline void
-object_update_reply_init(struct object_update_reply *reply, size_t count)
-{
- reply->ourp_magic = UPDATE_REPLY_MAGIC;
- reply->ourp_count = count;
-}
-
-static inline void
object_update_result_insert(struct object_update_reply *reply,
void *data, size_t data_len, size_t index,
int rc)
/* target/out_lib.c */
int out_update_pack(const struct lu_env *env, struct object_update *update,
- size_t max_update_size, enum update_type op,
+ size_t *max_update_size, enum update_type op,
const struct lu_fid *fid, unsigned int params_count,
__u16 *param_sizes, const void **param_bufs);
int out_create_pack(const struct lu_env *env, struct object_update *update,
- size_t max_update_size, const struct lu_fid *fid,
+ size_t *max_update_size, const struct lu_fid *fid,
const struct lu_attr *attr, struct dt_allocation_hint *hint,
struct dt_object_format *dof);
int out_object_destroy_pack(const struct lu_env *env,
struct object_update *update,
- size_t max_update_size,
+ size_t *max_update_size,
const struct lu_fid *fid);
int out_index_delete_pack(const struct lu_env *env,
- struct object_update *update, size_t max_update_size,
+ struct object_update *update, size_t *max_update_size,
const struct lu_fid *fid, const struct dt_key *key);
int out_index_insert_pack(const struct lu_env *env,
- struct object_update *update, size_t max_update_size,
+ struct object_update *update, size_t *max_update_size,
const struct lu_fid *fid, const struct dt_rec *rec,
const struct dt_key *key);
int out_xattr_set_pack(const struct lu_env *env,
- struct object_update *update, size_t max_update_size,
+ struct object_update *update, size_t *max_update_size,
const struct lu_fid *fid, const struct lu_buf *buf,
const char *name, __u32 flag);
int out_xattr_del_pack(const struct lu_env *env,
- struct object_update *update, size_t max_update_size,
+ struct object_update *update, size_t *max_update_size,
const struct lu_fid *fid, const char *name);
int out_attr_set_pack(const struct lu_env *env,
- struct object_update *update, size_t max_update_size,
+ struct object_update *update, size_t *max_update_size,
const struct lu_fid *fid, const struct lu_attr *attr);
int out_ref_add_pack(const struct lu_env *env,
- struct object_update *update, size_t max_update_size,
+ struct object_update *update, size_t *max_update_size,
const struct lu_fid *fid);
int out_ref_del_pack(const struct lu_env *env,
- struct object_update *update, size_t max_update_size,
+ struct object_update *update, size_t *max_update_size,
const struct lu_fid *fid);
int out_write_pack(const struct lu_env *env,
- struct object_update *update, size_t max_update_size,
+ struct object_update *update, size_t *max_update_size,
const struct lu_fid *fid, const struct lu_buf *buf,
__u64 pos);
int out_attr_get_pack(const struct lu_env *env,
- struct object_update *update, size_t max_update_size,
+ struct object_update *update, size_t *max_update_size,
const struct lu_fid *fid);
int out_index_lookup_pack(const struct lu_env *env,
- struct object_update *update, size_t max_update_size,
+ struct object_update *update, size_t *max_update_size,
const struct lu_fid *fid, struct dt_rec *rec,
const struct dt_key *key);
int out_xattr_get_pack(const struct lu_env *env,
- struct object_update *update, size_t max_update_size,
+ struct object_update *update, size_t *max_update_size,
const struct lu_fid *fid, const char *name);
int out_read_pack(const struct lu_env *env, struct object_update *update,
- size_t max_update_length, const struct lu_fid *fid,
+ size_t *max_update_length, const struct lu_fid *fid,
size_t size, loff_t pos);
const char *update_op_str(__u16 opcode);
__u64 transno = lustre_msg_get_transno(req->rq_reqmsg);
struct obd_export *exp = req->rq_export;
struct ptlrpc_request *reqiter;
+ struct ptlrpc_request *dup_req = NULL;
int dup = 0;
LASSERT(exp);
list_for_each_entry(reqiter, &exp->exp_req_replay_queue,
rq_replay_list) {
if (lustre_msg_get_transno(reqiter->rq_reqmsg) == transno) {
+ dup_req = reqiter;
dup = 1;
break;
}
(MSG_RESENT | MSG_REPLAY)) != (MSG_RESENT | MSG_REPLAY))
CERROR("invalid flags %x of resent replay\n",
lustre_msg_get_flags(req->rq_reqmsg));
+
+ if (lustre_msg_get_flags(req->rq_reqmsg) & MSG_REPLAY) {
+ __u32 new_conn;
+
+ new_conn = lustre_msg_get_conn_cnt(req->rq_reqmsg);
+ if (new_conn >
+ lustre_msg_get_conn_cnt(dup_req->rq_reqmsg))
+ lustre_msg_set_conn_cnt(dup_req->rq_reqmsg,
+ new_conn);
+ }
} else {
list_add_tail(&req->rq_replay_list,
&exp->exp_req_replay_queue);
int osp_pre_recovering;
};
+struct osp_update_request_sub {
+ struct object_update_request *ours_req;
+ size_t ours_req_size;
+ /* Linked to osp_update_request->our_req_list */
+ struct list_head ours_list;
+};
+
/**
* Tracking the updates being executed on this dt_device.
*/
/* update request result */
int our_rc;
- /* Holding object updates sent to the remote target */
- struct object_update_request *our_req;
- size_t our_req_size;
+ /* List of osp_update_request_sub */
+ struct list_head our_req_list;
struct list_head our_cb_items;
int osp_extend_update_buffer(const struct lu_env *env,
struct osp_update_request *our);
-#define osp_update_rpc_pack(env, name, update, op, ...) \
-({ \
- struct object_update *object_update; \
- size_t max_update_length; \
- struct object_update_request *ureq; \
- int ret; \
- \
+struct osp_update_request_sub *
+osp_current_object_update_request(struct osp_update_request *our);
+
+int osp_object_update_request_create(struct osp_update_request *our,
+ size_t size);
+
+#define osp_update_rpc_pack(env, name, our, op, ...) \
+({ \
+ struct object_update *object_update; \
+ size_t max_update_length; \
+ struct osp_update_request_sub *ours; \
+ int ret; \
+ \
while (1) { \
- ureq = update->our_req; \
- max_update_length = update->our_req_size - \
- object_update_request_size(ureq); \
+ ours = osp_current_object_update_request(our); \
+ LASSERT(ours != NULL); \
+ max_update_length = ours->ours_req_size - \
+ object_update_request_size(ours->ours_req); \
\
- object_update = update_buffer_get_update(ureq, \
- ureq->ourq_count); \
+ object_update = update_buffer_get_update(ours->ours_req,\
+ ours->ours_req->ourq_count); \
ret = out_##name##_pack(env, object_update, \
- max_update_length, \
+ &max_update_length, \
__VA_ARGS__); \
if (ret == -E2BIG) { \
int rc1; \
- /* extend the buffer and retry */ \
- rc1 = osp_extend_update_buffer(env, update); \
+ /* Create new object update request */ \
+ rc1 = osp_object_update_request_create(our, \
+ max_update_length + \
+ offsetof(struct object_update_request, \
+ ourq_updates[0]) + 1); \
if (rc1 != 0) { \
ret = rc1; \
break; \
} \
+ continue; \
} else { \
if (ret == 0) { \
+ ours->ours_req->ourq_count++; \
object_update->ou_flags |= \
update->our_flags; \
- ureq->ourq_count++; \
} \
break; \
} \
}
int osp_prep_update_req(const struct lu_env *env, struct obd_import *imp,
- const struct object_update_request *ureq,
+ struct osp_update_request *our,
struct ptlrpc_request **reqp);
int osp_remote_sync(const struct lu_env *env, struct osp_device *osp,
struct osp_update_request *update,
return ptr;
}
-int osp_extend_update_buffer(const struct lu_env *env,
- struct osp_update_request *our)
-{
- struct object_update_request *obj_update_req;
- size_t new_size = our->our_req_size + OUT_UPDATE_BUFFER_SIZE_ADD;
-
- /* enlarge object update request size */
- if (new_size > OUT_UPDATE_BUFFER_SIZE_MAX)
- return -E2BIG;
-
- OBD_ALLOC_LARGE(obj_update_req, new_size);
- if (obj_update_req == NULL)
- return -ENOMEM;
-
- memcpy(obj_update_req, our->our_req, our->our_req_size);
-
- OBD_FREE_LARGE(our->our_req, our->our_req_size);
-
- our->our_req = obj_update_req;
- our->our_req_size = new_size;
-
- return 0;
-}
-
/**
* Implementation of dt_object_operations::do_create
*
mutex_unlock(&osp->opd_async_requests_mutex);
osp_oac_xattr_put(oxe);
} else {
- struct osp_update_request *update;
+ struct osp_update_request *our;
+ struct osp_update_request_sub *ours;
/* XXX: Currently, we trigger the batched async OUT
* RPC via dt_declare_xattr_get(). It is not
* perfect solution, but works well now.
*
* We will improve it in the future. */
- update = osp->opd_async_requests;
- if (update != NULL && update->our_req != NULL &&
- update->our_req->ourq_count > 0) {
+ our = osp->opd_async_requests;
+ ours = osp_current_object_update_request(our);
+ if (ours != NULL && ours->ours_req != NULL &&
+ ours->ours_req->ourq_count > 0) {
osp->opd_async_requests = NULL;
mutex_unlock(&osp->opd_async_requests_mutex);
- rc = osp_unplug_async_request(env, osp, update);
+ rc = osp_unplug_async_request(env, osp, our);
} else {
mutex_unlock(&osp->opd_async_requests_mutex);
}
#define DEBUG_SUBSYSTEM S_MDS
+#include <lustre_net.h>
#include "osp_internal.h"
/**
return ourq;
}
-static void object_update_request_free(struct object_update_request *ourq,
- size_t ourq_size)
+/**
+ * Allocate new update request
+ *
+ * Allocate new update request and insert it to the req_update_list.
+ *
+ * \param [in] our osp_udate_request where to create a new
+ * update request
+ *
+ * \retval 0 if creation succeeds.
+ * \retval negative errno if creation fails.
+ */
+int osp_object_update_request_create(struct osp_update_request *our,
+ size_t size)
{
- if (ourq != NULL)
- OBD_FREE_LARGE(ourq, ourq_size);
+ struct osp_update_request_sub *ours;
+
+ OBD_ALLOC_PTR(ours);
+ if (ours == NULL)
+ return -ENOMEM;
+
+ ours->ours_req = object_update_request_alloc(size);
+
+ if (IS_ERR(ours->ours_req)) {
+ OBD_FREE_PTR(ours);
+ return -ENOMEM;
+ }
+
+ ours->ours_req_size = size;
+ INIT_LIST_HEAD(&ours->ours_list);
+ list_add_tail(&ours->ours_list, &our->our_req_list);
+
+ return 0;
+}
+
+/**
+ * Get current update request
+ *
+ * Get current object update request from our_req_list in
+ * osp_update_request, because we always insert the new update
+ * request in the last position, so the last update request
+ * in the list will be the current update req.
+ *
+ * \param[in] our osp update request where to get the
+ * current object update.
+ *
+ * \retval the current object update.
+ **/
+struct osp_update_request_sub *
+osp_current_object_update_request(struct osp_update_request *our)
+{
+ if (list_empty(&our->our_req_list))
+ return NULL;
+
+ return list_entry(our->our_req_list.prev, struct osp_update_request_sub,
+ ours_list);
}
/**
*/
struct osp_update_request *osp_update_request_create(struct dt_device *dt)
{
- struct osp_update_request *osp_update_req;
- struct object_update_request *ourq;
+ struct osp_update_request *our;
- OBD_ALLOC_PTR(osp_update_req);
- if (osp_update_req == NULL)
+ OBD_ALLOC_PTR(our);
+ if (our == NULL)
return ERR_PTR(-ENOMEM);
- ourq = object_update_request_alloc(OUT_UPDATE_INIT_BUFFER_SIZE);
- if (IS_ERR(ourq)) {
- OBD_FREE_PTR(osp_update_req);
- return ERR_CAST(ourq);
- }
-
- osp_update_req->our_req = ourq;
- osp_update_req->our_req_size = OUT_UPDATE_INIT_BUFFER_SIZE;
+ INIT_LIST_HEAD(&our->our_req_list);
+ INIT_LIST_HEAD(&our->our_cb_items);
+ INIT_LIST_HEAD(&our->our_list);
- INIT_LIST_HEAD(&osp_update_req->our_cb_items);
- INIT_LIST_HEAD(&osp_update_req->our_list);
-
- return osp_update_req;
+ osp_object_update_request_create(our, OUT_UPDATE_INIT_BUFFER_SIZE);
+ return our;
}
void osp_update_request_destroy(struct osp_update_request *our)
{
+ struct osp_update_request_sub *ours;
+ struct osp_update_request_sub *tmp;
+
if (our == NULL)
return;
- object_update_request_free(our->our_req,
- our->our_req_size);
+ list_for_each_entry_safe(ours, tmp, &our->our_req_list, ours_list) {
+ list_del(&ours->ours_list);
+ if (ours->ours_req != NULL)
+ OBD_FREE(ours->ours_req, ours->ours_req_size);
+ OBD_FREE_PTR(ours);
+ }
OBD_FREE_PTR(our);
}
ourq->ourq_magic, ourq->ourq_count, total_size);
}
+/**
+ * Prepare update request.
+ *
+ * Prepare OUT update ptlrpc request, and the request usually includes
+ * all of updates (stored in \param ureq) from one operation.
+ *
+ * \param[in] env execution environment
+ * \param[in] imp import on which ptlrpc request will be sent
+ * \param[in] ureq hold all of updates which will be packed into the req
+ * \param[in] reqp request to be created
+ *
+ * \retval 0 if preparation succeeds.
+ * \retval negative errno if preparation fails.
+ */
+int osp_prep_update_req(const struct lu_env *env, struct obd_import *imp,
+ struct osp_update_request *our,
+ struct ptlrpc_request **reqp)
+{
+ struct ptlrpc_request *req;
+ struct ptlrpc_bulk_desc *desc;
+ struct osp_update_request_sub *ours;
+ struct out_update_header *ouh;
+ struct out_update_buffer *oub;
+ __u32 buf_count = 0;
+ int rc;
+ ENTRY;
+
+ list_for_each_entry(ours, &our->our_req_list, ours_list) {
+ object_update_request_dump(ours->ours_req, D_INFO);
+ buf_count++;
+ }
+
+ req = ptlrpc_request_alloc(imp, &RQF_OUT_UPDATE);
+ if (req == NULL)
+ RETURN(-ENOMEM);
+
+ req_capsule_set_size(&req->rq_pill, &RMF_OUT_UPDATE_BUF, RCL_CLIENT,
+ buf_count * sizeof(*oub));
+
+ rc = ptlrpc_request_pack(req, LUSTRE_MDS_VERSION, OUT_UPDATE);
+ if (rc != 0)
+ GOTO(out_req, rc);
+
+ ouh = req_capsule_client_get(&req->rq_pill, &RMF_OUT_UPDATE_HEADER);
+ ouh->ouh_magic = OUT_UPDATE_HEADER_MAGIC;
+ ouh->ouh_count = buf_count;
+
+ oub = req_capsule_client_get(&req->rq_pill, &RMF_OUT_UPDATE_BUF);
+ list_for_each_entry(ours, &our->our_req_list, ours_list) {
+ oub->oub_size = ours->ours_req_size;
+ oub++;
+ }
+
+ req->rq_bulk_write = 1;
+ desc = ptlrpc_prep_bulk_imp(req, buf_count,
+ MD_MAX_BRW_SIZE >> LNET_MTU_BITS,
+ PTLRPC_BULK_GET_SOURCE | PTLRPC_BULK_BUF_KVEC,
+ MDS_BULK_PORTAL, &ptlrpc_bulk_kvec_ops);
+ if (desc == NULL)
+ GOTO(out_req, rc = -ENOMEM);
+
+ /* NB req now owns desc and will free it when it gets freed */
+ list_for_each_entry(ours, &our->our_req_list, ours_list)
+ desc->bd_frag_ops->add_iov_frag(desc, ours->ours_req,
+ ours->ours_req_size);
+
+ req_capsule_set_size(&req->rq_pill, &RMF_OUT_UPDATE_REPLY,
+ RCL_SERVER, OUT_UPDATE_REPLY_SIZE);
+
+ ptlrpc_request_set_replen(req);
+ req->rq_request_portal = OUT_PORTAL;
+ req->rq_reply_portal = OSC_REPLY_PORTAL;
+ *reqp = req;
+
+out_req:
+ if (rc < 0)
+ ptlrpc_req_finished(req);
+
+ RETURN(rc);
+}
+
+/**
+ * Send update RPC.
+ *
+ * Send update request to the remote MDT synchronously.
+ *
+ * \param[in] env execution environment
+ * \param[in] imp import on which ptlrpc request will be sent
+ * \param[in] our hold all of updates which will be packed into the req
+ * \param[in] reqp request to be created
+ *
+ * \retval 0 if RPC succeeds.
+ * \retval negative errno if RPC fails.
+ */
+
+int osp_remote_sync(const struct lu_env *env, struct osp_device *osp,
+ struct osp_update_request *our,
+ struct ptlrpc_request **reqp)
+{
+ struct obd_import *imp = osp->opd_obd->u.cli.cl_import;
+ struct ptlrpc_request *req = NULL;
+ int rc;
+ ENTRY;
+
+ rc = osp_prep_update_req(env, imp, our, &req);
+ if (rc != 0)
+ RETURN(rc);
+
+ /* This will only be called with read-only update, and these updates
+ * might be used to retrieve update log during recovery process, so
+ * it will be allowed to send during recovery process */
+ req->rq_allow_replay = 1;
+
+ /* Note: some dt index api might return non-zero result here, like
+ * osd_index_ea_lookup, so we should only check rc < 0 here */
+ rc = ptlrpc_queue_wait(req);
+ our->our_rc = rc;
+ if (rc < 0 || reqp == NULL)
+ ptlrpc_req_finished(req);
+ else
+ *reqp = req;
+
+ RETURN(rc);
+}
+
static void osp_trans_stop_cb(struct osp_thandle *oth, int result)
{
struct dt_txn_commit_cb *dcb;
int rc;
rc = osp_prep_update_req(env, osp->opd_obd->u.cli.cl_import,
- our->our_req, &req);
+ our, &req);
if (rc != 0) {
struct osp_update_callback *ouc;
struct osp_update_callback *next;
struct object_update *object_update;
size_t max_update_size;
struct object_update_request *ureq;
+ struct osp_update_request_sub *ours;
int rc = 0;
ENTRY;
RETURN(PTR_ERR(our));
again:
- ureq = our->our_req;
- max_update_size = our->our_req_size - object_update_request_size(ureq);
+ ours = osp_current_object_update_request(our);
+
+ ureq = ours->ours_req;
+ max_update_size = ours->ours_req_size -
+ object_update_request_size(ureq);
object_update = update_buffer_get_update(ureq, ureq->ourq_count);
- rc = out_update_pack(env, object_update, max_update_size, op,
+ rc = out_update_pack(env, object_update, &max_update_size, op,
lu_object_fid(osp2lu_obj(obj)), count, lens, bufs);
/* The queue is full. */
if (rc == -E2BIG) {
}
/**
- * Prepare update request.
- *
- * Prepare OUT update ptlrpc request, and the request usually includes
- * all of updates (stored in \param ureq) from one operation.
- *
- * \param[in] env execution environment
- * \param[in] imp import on which ptlrpc request will be sent
- * \param[in] ureq hold all of updates which will be packed into the req
- * \param[in] reqp request to be created
- *
- * \retval 0 if preparation succeeds.
- * \retval negative errno if preparation fails.
- */
-int osp_prep_update_req(const struct lu_env *env, struct obd_import *imp,
- const struct object_update_request *ureq,
- struct ptlrpc_request **reqp)
-{
- struct ptlrpc_request *req;
- struct object_update_request *tmp;
- size_t ureq_len;
- int rc;
- ENTRY;
-
- object_update_request_dump(ureq, D_INFO);
- req = ptlrpc_request_alloc(imp, &RQF_OUT_UPDATE);
- if (req == NULL)
- RETURN(-ENOMEM);
-
- ureq_len = object_update_request_size(ureq);
- req_capsule_set_size(&req->rq_pill, &RMF_OUT_UPDATE, RCL_CLIENT,
- ureq_len);
-
- rc = ptlrpc_request_pack(req, LUSTRE_MDS_VERSION, OUT_UPDATE);
- if (rc != 0) {
- ptlrpc_req_finished(req);
- RETURN(rc);
- }
-
- req_capsule_set_size(&req->rq_pill, &RMF_OUT_UPDATE_REPLY,
- RCL_SERVER, OUT_UPDATE_REPLY_SIZE);
-
- tmp = req_capsule_client_get(&req->rq_pill, &RMF_OUT_UPDATE);
- memcpy(tmp, ureq, ureq_len);
-
- ptlrpc_request_set_replen(req);
- req->rq_request_portal = OUT_PORTAL;
- req->rq_reply_portal = OSC_REPLY_PORTAL;
- *reqp = req;
-
- RETURN(rc);
-}
-
-/**
- * Send update RPC.
- *
- * Send update request to the remote MDT synchronously.
- *
- * \param[in] env execution environment
- * \param[in] imp import on which ptlrpc request will be sent
- * \param[in] our hold all of updates which will be packed into the req
- * \param[in] reqp request to be created
- *
- * \retval 0 if RPC succeeds.
- * \retval negative errno if RPC fails.
- */
-
-int osp_remote_sync(const struct lu_env *env, struct osp_device *osp,
- struct osp_update_request *our,
- struct ptlrpc_request **reqp)
-{
- struct obd_import *imp = osp->opd_obd->u.cli.cl_import;
- struct ptlrpc_request *req = NULL;
- int rc;
- ENTRY;
-
- rc = osp_prep_update_req(env, imp, our->our_req, &req);
- if (rc != 0)
- RETURN(rc);
-
- /* This will only be called with read-only update, and these updates
- * might be used to retrieve update log during recovery process, so
- * it will be allowed to send during recovery process */
- req->rq_allow_replay = 1;
-
- /* Note: some dt index api might return non-zero result here, like
- * osd_index_ea_lookup, so we should only check rc < 0 here */
- rc = ptlrpc_queue_wait(req);
- if (rc < 0) {
- ptlrpc_req_finished(req);
- our->our_rc = rc;
- RETURN(rc);
- }
-
- if (reqp != NULL) {
- *reqp = req;
- RETURN(rc);
- }
-
- our->our_rc = rc;
-
- ptlrpc_req_finished(req);
-
- RETURN(rc);
-}
-
-/**
* Add commit callback to transaction.
*
* Add commit callback to the osp thandle, which will be called
LASSERT(oth != NULL);
LASSERT(our->our_req_sent == 0);
rc = osp_prep_update_req(env, osp->opd_obd->u.cli.cl_import,
- our->our_req, &req);
+ our, &req);
if (rc != 0) {
osp_trans_callback(env, oth, rc);
RETURN(rc);
oth->ot_storage_th = NULL;
}
- if (our == NULL || our->our_req == NULL ||
- our->our_req->ourq_count == 0) {
+ if (our == NULL || list_empty(&our->our_req_list)) {
osp_trans_callback(env, oth, th->th_result);
GOTO(out, rc = th->th_result);
}
};
EXPORT_SYMBOL(ptlrpc_bulk_kiov_nopin_ops);
+const struct ptlrpc_bulk_frag_ops ptlrpc_bulk_kvec_ops = {
+ .add_iov_frag = ptlrpc_prep_bulk_frag,
+};
+EXPORT_SYMBOL(ptlrpc_bulk_kvec_ops);
+
static int ptlrpc_send_new_req(struct ptlrpc_request *req);
static int ptlrpcd_check_work(struct ptlrpc_request *req);
static int ptlrpc_unregister_reply(struct ptlrpc_request *request, int async);
void *frag, int len)
{
struct kvec *iovec;
+ ENTRY;
LASSERT(desc->bd_iov_count < desc->bd_max_iov);
LASSERT(frag != NULL);
desc->bd_iov_count++;
- return desc->bd_nob;
+ RETURN(desc->bd_nob);
}
EXPORT_SYMBOL(ptlrpc_prep_bulk_frag);
static const struct req_msg_field *mds_update_client[] = {
&RMF_PTLRPC_BODY,
- &RMF_OUT_UPDATE,
+ &RMF_OUT_UPDATE_HEADER,
+ &RMF_OUT_UPDATE_BUF,
};
static const struct req_msg_field *mds_update_server[] = {
lustre_swab_lfsck_reply, NULL);
EXPORT_SYMBOL(RMF_LFSCK_REPLY);
+struct req_msg_field RMF_OUT_UPDATE_HEADER = DEFINE_MSGF("out_update", 0,
+ sizeof(struct out_update_header),
+ lustre_swab_out_update_header, NULL);
+EXPORT_SYMBOL(RMF_OUT_UPDATE_HEADER);
+
+struct req_msg_field RMF_OUT_UPDATE_BUF = DEFINE_MSGF("update_buf",
+ RMF_F_STRUCT_ARRAY, sizeof(struct out_update_buffer),
+ lustre_swab_out_update_buffer, NULL);
+EXPORT_SYMBOL(RMF_OUT_UPDATE_BUF);
+
/*
* Request formats.
*/
EXPORT_SYMBOL(RQF_MDS_GET_INFO);
struct req_format RQF_OUT_UPDATE =
- DEFINE_REQ_FMT0("OUT_UPDATE_OBJ", mds_update_client,
+ DEFINE_REQ_FMT0("OUT_UPDATE", mds_update_client,
mds_update_server);
EXPORT_SYMBOL(RQF_OUT_UPDATE);
}
}
+void lustre_swab_out_update_header(struct out_update_header *ouh)
+{
+ __swab32s(&ouh->ouh_magic);
+ __swab32s(&ouh->ouh_count);
+}
+EXPORT_SYMBOL(lustre_swab_out_update_header);
+
+void lustre_swab_out_update_buffer(struct out_update_buffer *oub)
+{
+ __swab32s(&oub->oub_size);
+ __swab32s(&oub->oub_padding);
+}
+EXPORT_SYMBOL(lustre_swab_out_update_buffer);
+
void lustre_swab_swap_layouts(struct mdc_swap_layouts *msl)
{
__swab64s(&msl->msl_flags);
struct thandle_exec_args *ta = &tti->tti_tea;
struct req_capsule *pill = tsi->tsi_pill;
struct dt_device *dt = tsi->tsi_tgt->lut_bottom;
- struct object_update_request *ureq;
+ struct out_update_header *ouh;
+ struct out_update_buffer *oub;
struct object_update *update;
struct object_update_reply *reply;
- int bufsize;
- int count;
- int current_batchid = -1;
- int i;
- int rc = 0;
- int rc1 = 0;
+ struct ptlrpc_bulk_desc *desc;
+ struct l_wait_info lwi;
+ void **update_bufs;
+ int current_batchid = -1;
+ __u32 update_buf_count;
+ unsigned int i;
+ unsigned int reply_index = 0;
+ int rc = 0;
+ int rc1 = 0;
ENTRY;
req_capsule_set(pill, &RQF_OUT_UPDATE);
- ureq = req_capsule_client_get(pill, &RMF_OUT_UPDATE);
- if (ureq == NULL) {
+ ouh = req_capsule_client_get(pill, &RMF_OUT_UPDATE_HEADER);
+ if (ouh == NULL) {
CERROR("%s: No buf!: rc = %d\n", tgt_name(tsi->tsi_tgt),
-EPROTO);
RETURN(err_serious(-EPROTO));
}
- bufsize = req_capsule_get_size(pill, &RMF_OUT_UPDATE, RCL_CLIENT);
- if (bufsize != object_update_request_size(ureq)) {
- CERROR("%s: invalid bufsize %d: rc = %d\n",
- tgt_name(tsi->tsi_tgt), bufsize, -EPROTO);
- RETURN(err_serious(-EPROTO));
- }
-
- if (ureq->ourq_magic != UPDATE_REQUEST_MAGIC) {
+ if (ouh->ouh_magic != OUT_UPDATE_HEADER_MAGIC) {
CERROR("%s: invalid update buffer magic %x expect %x: "
- "rc = %d\n", tgt_name(tsi->tsi_tgt), ureq->ourq_magic,
+ "rc = %d\n", tgt_name(tsi->tsi_tgt), ouh->ouh_magic,
UPDATE_REQUEST_MAGIC, -EPROTO);
RETURN(err_serious(-EPROTO));
}
- count = ureq->ourq_count;
- if (count <= 0) {
+ update_buf_count = ouh->ouh_count;
+ if (update_buf_count == 0) {
CERROR("%s: empty update: rc = %d\n", tgt_name(tsi->tsi_tgt),
-EPROTO);
RETURN(err_serious(-EPROTO));
RETURN(rc);
}
+ OBD_ALLOC(update_bufs, sizeof(*update_bufs) * update_buf_count);
+ if (update_bufs == NULL)
+ RETURN(-ENOMEM);
+
+ oub = req_capsule_client_get(pill, &RMF_OUT_UPDATE_BUF);
+ desc = ptlrpc_prep_bulk_exp(pill->rc_req, update_buf_count,
+ PTLRPC_BULK_OPS_COUNT,
+ PTLRPC_BULK_GET_SINK |
+ PTLRPC_BULK_BUF_KVEC,
+ MDS_BULK_PORTAL, &ptlrpc_bulk_kvec_ops);
+ if (desc == NULL)
+ GOTO(out_free, rc = -ENOMEM);
+
+ /* NB Having prepped, we must commit... */
+ for (i = 0; i < update_buf_count; i++, oub++) {
+ OBD_ALLOC(update_bufs[i], oub->oub_size);
+ if (update_bufs[i] == NULL)
+ GOTO(out_free, rc = -ENOMEM);
+
+ desc->bd_frag_ops->add_iov_frag(desc, update_bufs[i],
+ oub->oub_size);
+ }
+
+ pill->rc_req->rq_bulk_write = 1;
+ rc = sptlrpc_svc_prep_bulk(pill->rc_req, desc);
+ if (rc != 0)
+ GOTO(out_free, rc);
+
+ rc = target_bulk_io(pill->rc_req->rq_export, desc, &lwi);
+ if (rc < 0)
+ GOTO(out_free, rc);
+
/* Prepare the update reply buffer */
reply = req_capsule_server_get(pill, &RMF_OUT_UPDATE_REPLY);
if (reply == NULL)
- RETURN(err_serious(-EPROTO));
- object_update_reply_init(reply, count);
+ GOTO(out_free, rc = err_serious(-EPROTO));
+ reply->ourp_magic = UPDATE_REPLY_MAGIC;
tti->tti_u.update.tti_update_reply = reply;
tti->tti_mult_trans = !req_is_replay(tgt_ses_req(tsi));
/* Walk through updates in the request to execute them synchronously */
- for (i = 0; i < count; i++) {
+ for (i = 0; i < update_buf_count; i++) {
struct tgt_handler *h;
struct dt_object *dt_obj;
+ int update_count;
+ struct object_update_request *our;
+ int j;
- update = object_update_request_get(ureq, i, NULL);
- if (update == NULL)
- GOTO(out, rc = -EPROTO);
-
+ our = update_bufs[i];
if (ptlrpc_req_need_swab(pill->rc_req))
- lustre_swab_object_update(update);
-
- if (!fid_is_sane(&update->ou_fid)) {
- CERROR("%s: invalid FID "DFID": rc = %d\n",
- tgt_name(tsi->tsi_tgt), PFID(&update->ou_fid),
- -EPROTO);
- GOTO(out, rc = err_serious(-EPROTO));
- }
+ lustre_swab_object_update_request(our);
- dt_obj = dt_locate(env, dt, &update->ou_fid);
- if (IS_ERR(dt_obj))
- GOTO(out, rc = PTR_ERR(dt_obj));
-
- if (dt->dd_record_fid_accessed) {
- lfsck_pack_rfa(&tti->tti_lr,
- lu_object_fid(&dt_obj->do_lu),
- LE_FID_ACCESSED,
- LFSCK_TYPE_LAYOUT);
- tgt_lfsck_in_notify(env, dt, &tti->tti_lr, NULL);
+ if (our->ourq_magic != UPDATE_REQUEST_MAGIC) {
+ CERROR("%s: invalid update buffer magic %x"
+ " expect %x: rc = %d\n",
+ tgt_name(tsi->tsi_tgt), our->ourq_magic,
+ UPDATE_REQUEST_MAGIC, -EPROTO);
+ GOTO(out, rc = -EPROTO);
}
- tti->tti_u.update.tti_dt_object = dt_obj;
- tti->tti_u.update.tti_update = update;
- tti->tti_u.update.tti_update_reply_index = i;
+ update_count = our->ourq_count;
+ reply->ourp_count += update_count;
+ for (j = 0; j < update_count; j++) {
+ update = object_update_request_get(our, j, NULL);
+ if (update == NULL)
+ GOTO(out, rc = -EPROTO);
+
+ if (ptlrpc_req_need_swab(pill->rc_req))
+ lustre_swab_object_update(update);
+
+ if (!fid_is_sane(&update->ou_fid)) {
+ CERROR("%s: invalid FID "DFID": rc = %d\n",
+ tgt_name(tsi->tsi_tgt),
+ PFID(&update->ou_fid), -EPROTO);
+ GOTO(out, rc = err_serious(-EPROTO));
+ }
- h = out_handler_find(update->ou_type);
- if (unlikely(h == NULL)) {
- CERROR("%s: unsupported opc: 0x%x\n",
- tgt_name(tsi->tsi_tgt), update->ou_type);
- GOTO(next, rc = -ENOTSUPP);
- }
+ dt_obj = dt_locate(env, dt, &update->ou_fid);
+ if (IS_ERR(dt_obj))
+ GOTO(out, rc = PTR_ERR(dt_obj));
+
+ if (dt->dd_record_fid_accessed) {
+ lfsck_pack_rfa(&tti->tti_lr,
+ lu_object_fid(&dt_obj->do_lu),
+ LE_FID_ACCESSED,
+ LFSCK_TYPE_LAYOUT);
+ tgt_lfsck_in_notify(env, dt, &tti->tti_lr,
+ NULL);
+ }
- /* Check resend case only for modifying RPC */
- if (h->th_flags & MUTABOR) {
- struct ptlrpc_request *req = tgt_ses_req(tsi);
+ tti->tti_u.update.tti_dt_object = dt_obj;
+ tti->tti_u.update.tti_update = update;
+ tti->tti_u.update.tti_update_reply_index = reply_index;
- if (out_check_resent(env, dt, dt_obj, req,
- out_reconstruct, reply, i))
- GOTO(next, rc = 0);
- }
-
- /* start transaction for modification RPC only */
- if (h->th_flags & MUTABOR && current_batchid == -1) {
- current_batchid = update->ou_batchid;
- rc = out_tx_start(env, dt, ta, tsi->tsi_exp);
- if (rc != 0)
- GOTO(next, rc);
+ h = out_handler_find(update->ou_type);
+ if (unlikely(h == NULL)) {
+ CERROR("%s: unsupported opc: 0x%x\n",
+ tgt_name(tsi->tsi_tgt), update->ou_type);
+ GOTO(next, rc = -ENOTSUPP);
+ }
- if (update->ou_flags & UPDATE_FL_SYNC)
- ta->ta_handle->th_sync = 1;
- }
+ /* Check resend case only for modifying RPC */
+ if (h->th_flags & MUTABOR) {
+ struct ptlrpc_request *req = tgt_ses_req(tsi);
- /* Stop the current update transaction, if the update has
- * different batchid, or read-only update */
- if (((current_batchid != update->ou_batchid) ||
- !(h->th_flags & MUTABOR)) && ta->ta_handle != NULL) {
- rc = out_tx_end(env, ta, rc);
- current_batchid = -1;
- if (rc != 0)
- GOTO(next, rc);
+ if (out_check_resent(env, dt, dt_obj, req,
+ out_reconstruct, reply,
+ reply_index))
+ GOTO(next, rc = 0);
+ }
- /* start a new transaction if needed */
- if (h->th_flags & MUTABOR) {
+ /* start transaction for modification RPC only */
+ if (h->th_flags & MUTABOR && current_batchid == -1) {
+ current_batchid = update->ou_batchid;
rc = out_tx_start(env, dt, ta, tsi->tsi_exp);
if (rc != 0)
GOTO(next, rc);
if (update->ou_flags & UPDATE_FL_SYNC)
ta->ta_handle->th_sync = 1;
+ }
- current_batchid = update->ou_batchid;
+ /* Stop the current update transaction, if the update
+ * has different batchid, or read-only update */
+ if (((current_batchid != update->ou_batchid) ||
+ !(h->th_flags & MUTABOR)) &&
+ ta->ta_handle != NULL) {
+ rc = out_tx_end(env, ta, rc);
+ current_batchid = -1;
+ if (rc != 0)
+ GOTO(next, rc);
+
+ /* start a new transaction if needed */
+ if (h->th_flags & MUTABOR) {
+ rc = out_tx_start(env, dt, ta,
+ tsi->tsi_exp);
+ if (rc != 0)
+ GOTO(next, rc);
+ if (update->ou_flags & UPDATE_FL_SYNC)
+ ta->ta_handle->th_sync = 1;
+ current_batchid = update->ou_batchid;
+ }
}
- }
- rc = h->th_act(tsi);
+ rc = h->th_act(tsi);
next:
- lu_object_put(env, &dt_obj->do_lu);
- if (rc < 0)
- GOTO(out, rc);
+ reply_index++;
+ lu_object_put(env, &dt_obj->do_lu);
+ if (rc < 0)
+ GOTO(out, rc);
+ }
}
out:
if (current_batchid != -1) {
rc = rc1;
}
+out_free:
+ oub = req_capsule_client_get(pill, &RMF_OUT_UPDATE_BUF);
+ if (update_bufs != NULL) {
+ for (i = 0; i < update_buf_count; i++, oub++) {
+ if (update_bufs[i] != NULL)
+ OBD_FREE(update_bufs[i], oub->oub_size);
+ }
+ OBD_FREE(update_bufs, sizeof(update_bufs[0]) *
+ update_buf_count);
+ }
+
+ if (desc != NULL)
+ ptlrpc_free_bulk(desc);
+
RETURN(rc);
}
#include <obd_class.h>
#include "tgt_internal.h"
-#define OUT_UPDATE_BUFFER_SIZE_ADD 4096
-#define OUT_UPDATE_BUFFER_SIZE_MAX (256 * 4096) /* 1MB update size now */
-
const char *update_op_str(__u16 opc)
{
static const char *opc_str[] = {
*
* \params[in] env execution environment
* \params[in] update object update to be filled
- * \params[in] max_update_size maximum object update size, if the current
- * update length equals or exceeds the size, it
- * will return -E2BIG.
+ * \params[in,out] max_update_size maximum object update size, if the
+ * current update length equals or
+ * exceeds the size, it will return -E2BIG.
* \params[in] update_op update type
* \params[in] fid object FID of the update
- * \params[in] params_count the count of the update parameters
- * \params[in] params_sizes the length of each parameters
+ * \params[in] param_count the count of the update parameters
+ * \params[in] param_sizes the length of each parameters
*
* \retval 0 if packing succeeds.
* \retval -E2BIG if packing exceeds the maximum length.
*/
int out_update_header_pack(const struct lu_env *env,
- struct object_update *update, size_t max_update_size,
- enum update_type update_op, const struct lu_fid *fid,
- unsigned int param_count, __u16 *params_sizes)
+ struct object_update *update,
+ size_t *max_update_size,
+ enum update_type update_op,
+ const struct lu_fid *fid,
+ unsigned int param_count, __u16 *param_sizes)
{
struct object_update_param *param;
unsigned int i;
/* Check whether the packing exceeding the maxima update length */
update_size = sizeof(*update);
for (i = 0; i < param_count; i++)
- update_size += cfs_size_round(sizeof(*param) + params_sizes[i]);
+ update_size += cfs_size_round(sizeof(*param) + param_sizes[i]);
- if (unlikely(update_size >= max_update_size))
+ if (unlikely(update_size >= *max_update_size)) {
+ *max_update_size = update_size;
return -E2BIG;
+ }
update->ou_fid = *fid;
update->ou_type = update_op;
update->ou_params_count = param_count;
param = &update->ou_params[0];
for (i = 0; i < param_count; i++) {
- param->oup_len = params_sizes[i];
+ param->oup_len = param_sizes[i];
param = (struct object_update_param *)((char *)param +
object_update_param_size(param));
}
* \retval negative errno if updates packing fails
**/
int out_update_pack(const struct lu_env *env, struct object_update *update,
- size_t max_update_size, enum update_type op,
+ size_t *max_update_size, enum update_type op,
const struct lu_fid *fid, unsigned int param_count,
__u16 *param_sizes, const void **param_bufs)
{
* \retval negative errno if insertion fails.
*/
int out_create_pack(const struct lu_env *env, struct object_update *update,
- size_t max_update_size, const struct lu_fid *fid,
+ size_t *max_update_size, const struct lu_fid *fid,
const struct lu_attr *attr, struct dt_allocation_hint *hint,
struct dt_object_format *dof)
{
EXPORT_SYMBOL(out_create_pack);
int out_ref_del_pack(const struct lu_env *env, struct object_update *update,
- size_t max_update_size, const struct lu_fid *fid)
+ size_t *max_update_size, const struct lu_fid *fid)
{
return out_update_pack(env, update, max_update_size, OUT_REF_DEL, fid,
0, NULL, NULL);
EXPORT_SYMBOL(out_ref_del_pack);
int out_ref_add_pack(const struct lu_env *env, struct object_update *update,
- size_t max_update_size, const struct lu_fid *fid)
+ size_t *max_update_size, const struct lu_fid *fid)
{
return out_update_pack(env, update, max_update_size, OUT_REF_ADD, fid,
0, NULL, NULL);
EXPORT_SYMBOL(out_ref_add_pack);
int out_attr_set_pack(const struct lu_env *env, struct object_update *update,
- size_t max_update_size, const struct lu_fid *fid,
+ size_t *max_update_size, const struct lu_fid *fid,
const struct lu_attr *attr)
{
struct obdo *obdo;
EXPORT_SYMBOL(out_attr_set_pack);
int out_xattr_set_pack(const struct lu_env *env, struct object_update *update,
- size_t max_update_size, const struct lu_fid *fid,
+ size_t *max_update_size, const struct lu_fid *fid,
const struct lu_buf *buf, const char *name, __u32 flag)
{
__u16 sizes[3] = {strlen(name) + 1, buf->lb_len, sizeof(flag)};
EXPORT_SYMBOL(out_xattr_set_pack);
int out_xattr_del_pack(const struct lu_env *env, struct object_update *update,
- size_t max_update_size, const struct lu_fid *fid,
+ size_t *max_update_size, const struct lu_fid *fid,
const char *name)
{
__u16 size = strlen(name) + 1;
}
EXPORT_SYMBOL(out_xattr_del_pack);
-
int out_index_insert_pack(const struct lu_env *env,
struct object_update *update,
- size_t max_update_size, const struct lu_fid *fid,
+ size_t *max_update_size, const struct lu_fid *fid,
const struct dt_rec *rec, const struct dt_key *key)
{
struct dt_insert_rec *rec1 = (struct dt_insert_rec *)rec;
int out_index_delete_pack(const struct lu_env *env,
struct object_update *update,
- size_t max_update_size, const struct lu_fid *fid,
+ size_t *max_update_size, const struct lu_fid *fid,
const struct dt_key *key)
{
__u16 size = strlen((char *)key) + 1;
int out_object_destroy_pack(const struct lu_env *env,
struct object_update *update,
- size_t max_update_size, const struct lu_fid *fid)
+ size_t *max_update_size, const struct lu_fid *fid)
{
return out_update_pack(env, update, max_update_size, OUT_DESTROY, fid,
0, NULL, NULL);
EXPORT_SYMBOL(out_object_destroy_pack);
int out_write_pack(const struct lu_env *env, struct object_update *update,
- size_t max_update_size, const struct lu_fid *fid,
+ size_t *max_update_size, const struct lu_fid *fid,
const struct lu_buf *buf, __u64 pos)
{
__u16 sizes[2] = {buf->lb_len, sizeof(pos)};
**/
int out_index_lookup_pack(const struct lu_env *env,
struct object_update *update,
- size_t max_update_size, const struct lu_fid *fid,
+ size_t *max_update_size, const struct lu_fid *fid,
struct dt_rec *rec, const struct dt_key *key)
{
const void *name = key;
EXPORT_SYMBOL(out_index_lookup_pack);
int out_attr_get_pack(const struct lu_env *env, struct object_update *update,
- size_t max_update_size, const struct lu_fid *fid)
+ size_t *max_update_size, const struct lu_fid *fid)
{
return out_update_pack(env, update, max_update_size, OUT_ATTR_GET,
fid, 0, NULL, NULL);
EXPORT_SYMBOL(out_attr_get_pack);
int out_xattr_get_pack(const struct lu_env *env, struct object_update *update,
- size_t max_update_size, const struct lu_fid *fid,
+ size_t *max_update_size, const struct lu_fid *fid,
const char *name)
{
__u16 size;
EXPORT_SYMBOL(out_xattr_get_pack);
int out_read_pack(const struct lu_env *env, struct object_update *update,
- size_t max_update_length, const struct lu_fid *fid,
+ size_t *max_update_size, const struct lu_fid *fid,
size_t size, loff_t pos)
{
__u16 sizes[2] = {sizeof(size), sizeof(pos)};
size = cpu_to_le64(size);
pos = cpu_to_le64(pos);
- return out_update_pack(env, update, max_update_length, OUT_READ, fid,
+ return out_update_pack(env, update, max_update_size, OUT_READ, fid,
ARRAY_SIZE(sizes), sizes, bufs);
}
EXPORT_SYMBOL(out_read_pack);
# bug number for skipped tests:
# b=17466/LU-472 : 61d
# LU-5319 : 53a 53d
-ALWAYS_EXCEPT="61d 53a 53d $REPLAY_SINGLE_EXCEPT"
+# LU-6780 : 80d 80h 81d 81h 110e 110f 110g 111c 111d 111e 111f 111g 112
+ALWAYS_EXCEPT="61d 53a 53d 80d 80h 81d 81h 110e 110f 110g 111c 111d 111e 111f 111g 112 $REPLAY_SINGLE_EXCEPT"
# UPDATE THE COMMENT ABOVE WITH BUG NUMBERS WHEN CHANGING ALWAYS_EXCEPT!
case "$(lsb_release -sr)" in # only disable tests for el7
#define lustre_swab_object_update_result NULL
#define lustre_swab_object_update_reply NULL
#define lustre_swab_object_update_request NULL
+#define lustre_swab_out_update_header NULL
+#define lustre_swab_out_update_buffer NULL
#define dump_rniobuf NULL
#define dump_ioo NULL