int lustre_shrink_msg(struct lustre_msg *msg, int segment,
unsigned int newlen, int move_data);
void lustre_free_reply_state(struct ptlrpc_reply_state *rs);
+int lustre_msg_hdr_size(__u32 magic, int count);
int lustre_msg_size(__u32 magic, int count, int *lengths);
int lustre_msg_size_v2(int count, int *lengths);
int lustre_packed_msg_size(struct lustre_msg *msg);
int req_capsule_get_size(const struct req_capsule *pill,
const struct req_msg_field *field,
enum req_location loc);
+int req_capsule_msg_size(const struct req_capsule *pill, enum req_location loc);
+int req_capsule_fmt_size(__u32 magic, const struct req_format *fmt,
+ enum req_location loc);
void req_capsule_extend(struct req_capsule *pill, const struct req_format *fmt);
int req_capsule_has_field(const struct req_capsule *pill,
/* PAGE_SIZE-512 is to allow TCP/IP and LNET headers to fit into
* a single page on the send/receive side. XXX: 512 should be changed
* to more adequate value. */
-static inline int ldlm_req_handles_avail(struct obd_export *exp,
- int *size, int bufcount,
- int bufoff, int off)
+static inline int ldlm_req_handles_avail(int req_size, int off)
{
- int avail = min_t(int, LDLM_MAXREQSIZE, PAGE_SIZE - 512);
- int old_size = size[bufoff];
+ int avail;
- size[bufoff] = sizeof(struct ldlm_request);
- avail -= lustre_msg_size(class_exp2cliimp(exp)->imp_msg_magic,
- bufcount, size);
+ avail = min_t(int, LDLM_MAXREQSIZE, PAGE_SIZE - 512) - req_size;
avail /= sizeof(struct lustre_handle);
avail += LDLM_LOCKREQ_HANDLES - off;
- size[bufoff] = old_size;
return avail;
}
-static inline int ldlm_cancel_handles_avail(struct obd_export *exp)
+static inline int ldlm_capsule_handles_avail(struct req_capsule *pill,
+ enum req_location loc,
+ int off)
{
- int size[2] = { sizeof(struct ptlrpc_body),
- sizeof(struct ldlm_request) };
- return ldlm_req_handles_avail(exp, size, 2, DLM_LOCKREQ_OFF, 0);
+ int size = req_capsule_msg_size(pill, loc);
+ return ldlm_req_handles_avail(size, off);
+}
+
+static inline int ldlm_format_handles_avail(struct obd_import *imp,
+ const struct req_format *fmt,
+ enum req_location loc, int off)
+{
+ int size = req_capsule_fmt_size(imp->imp_msg_magic, fmt, loc);
+ return ldlm_req_handles_avail(size, off);
}
/* Cancel lru locks and pack them into the enqueue request. Pack there the given
struct req_capsule *pill = &req->rq_pill;
struct ldlm_request *dlm = NULL;
int flags, avail, to_free, bufcount, pack = 0;
+ CFS_LIST_HEAD(head);
int rc;
ENTRY;
-
- LASSERT(cancels != NULL);
-
+ if (cancels == NULL)
+ cancels = &head;
if (exp_connect_cancelset(exp)) {
/* Estimate the amount of available space in the request. */
bufcount = req_capsule_filled_sizes(pill, RCL_CLIENT);
- avail = ldlm_req_handles_avail(exp, pill->rc_area[RCL_CLIENT],
- bufcount, bufcount - 1, canceloff);
+ avail = ldlm_capsule_handles_avail(pill, RCL_CLIENT, canceloff);
+
flags = ns_connect_lru_resize(ns) ?
LDLM_CANCEL_LRUR : LDLM_CANCEL_AGED;
to_free = !ns_connect_lru_resize(ns) &&
pack = count;
else
pack = avail;
- req_capsule_set_size(&req->rq_pill, &RMF_DLM_REQ, RCL_CLIENT,
- ldlm_request_bufsize(count, opc));
+ req_capsule_set_size(pill, &RMF_DLM_REQ, RCL_CLIENT,
+ ldlm_request_bufsize(pack, opc));
}
rc = ptlrpc_request_pack(req, version, opc);
RETURN(0);
}
-int ldlm_prep_enqueue_req(struct obd_export *exp,
- struct ptlrpc_request *req,
- struct list_head *cancels,
- int count)
+int ldlm_prep_enqueue_req(struct obd_export *exp, struct ptlrpc_request *req,
+ struct list_head *cancels, int count)
{
return ldlm_prep_elc_req(exp, req, LUSTRE_DLM_VERSION, LDLM_ENQUEUE,
LDLM_ENQUEUE_CANCEL_OFF, cancels, count);
if (OBD_FAIL_CHECK(OBD_FAIL_LDLM_CANCEL_RACE))
RETURN(count);
+ free = ldlm_format_handles_avail(class_exp2cliimp(exp),
+ &RQF_LDLM_CANCEL, RCL_CLIENT, 0);
+ if (count > free)
+ count = free;
+
while (1) {
int bufcount;
- struct req_capsule *pill;
+
imp = class_exp2cliimp(exp);
if (imp == NULL || imp->imp_invalid) {
CDEBUG(D_DLMTRACE,
if (req == NULL)
GOTO(out, rc = -ENOMEM);
- pill = &req->rq_pill;
- bufcount = req_capsule_filled_sizes(pill, RCL_CLIENT);
-
- free = ldlm_req_handles_avail(exp, pill->rc_area[RCL_CLIENT],
- bufcount, bufcount, 0);
- if (count > free)
- count = free;
-
+ bufcount = req_capsule_filled_sizes(&req->rq_pill, RCL_CLIENT);
req_capsule_set_size(&req->rq_pill, &RMF_DLM_REQ, RCL_CLIENT,
ldlm_request_bufsize(count, LDLM_CANCEL));
int ldlm_cli_cancel(struct lustre_handle *lockh)
{
+ struct obd_export *exp;
int avail, flags, count = 1, rc = 0;
struct ldlm_namespace *ns;
struct ldlm_lock *lock;
* here and send them all as one LDLM_CANCEL rpc. */
LASSERT(list_empty(&lock->l_bl_ast));
list_add(&lock->l_bl_ast, &cancels);
- if (exp_connect_cancelset(lock->l_conn_export)) {
- avail = ldlm_cancel_handles_avail(lock->l_conn_export);
+
+ exp = lock->l_conn_export;
+ if (exp_connect_cancelset(exp)) {
+ avail = ldlm_format_handles_avail(class_exp2cliimp(exp),
+ &RQF_LDLM_CANCEL,
+ RCL_CLIENT, 0);
LASSERT(avail > 0);
ns = lock->l_resource->lr_namespace;
RCL_CLIENT, 0);
}
- if (exp_connect_cancelset(exp) && count) {
- req_capsule_set_size(&req->rq_pill, &RMF_DLM_REQ, RCL_CLIENT,
- ldlm_request_bufsize(count, LDLM_ENQUEUE));
- }
-
- rc = ptlrpc_request_pack(req, LUSTRE_DLM_VERSION, LDLM_ENQUEUE);
+ rc = ldlm_prep_enqueue_req(exp, req, &cancels, count);
if (rc) {
ptlrpc_request_free(req);
- ldlm_lock_list_put(&cancels, l_bl_ast, count);
return NULL;
}
- if (exp_connect_cancelset(exp) && req)
- ldlm_cli_cancel_list(&cancels, count, req, 0);
if (joinfile) {
__u64 head_size = *(__u64 *)op_data->op_data;
req_capsule_set_size(&req->rq_pill, &RMF_NAME, RCL_CLIENT,
op_data->op_namelen + 1);
- rc = ptlrpc_request_pack(req, LUSTRE_DLM_VERSION, LDLM_ENQUEUE);
+ rc = ldlm_prep_enqueue_req(exp, req, NULL, 0);
if (rc) {
ptlrpc_request_free(req);
RETURN(ERR_PTR(rc));
req_capsule_set_size(&req->rq_pill, &RMF_NAME, RCL_CLIENT,
op_data->op_namelen + 1);
- rc = ptlrpc_request_pack(req, LUSTRE_DLM_VERSION, LDLM_ENQUEUE);
+ rc = ldlm_prep_enqueue_req(exp, req, NULL, 0);
if (rc) {
ptlrpc_request_free(req);
RETURN(ERR_PTR(rc));
static struct ptlrpc_request *ldlm_enqueue_pack(struct obd_export *exp)
{
struct ptlrpc_request *req;
+ int rc;
ENTRY;
- req = ptlrpc_request_alloc_pack(class_exp2cliimp(exp),
- &RQF_LDLM_ENQUEUE, LUSTRE_DLM_VERSION,
- LDLM_ENQUEUE);
+ req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_LDLM_ENQUEUE);
if (req == NULL)
RETURN(ERR_PTR(-ENOMEM));
+ rc = ldlm_prep_enqueue_req(exp, req, NULL, 0);
+ if (rc) {
+ ptlrpc_request_free(req);
+ RETURN(ERR_PTR(rc));
+ }
+
ptlrpc_request_set_replen(req);
RETURN(req);
}
rc = mdc_prep_elc_req(exp, req, &cancels, count);
if (rc) {
- ldlm_lock_list_put(&cancels, l_bl_ast, count);
ptlrpc_request_free(req);
RETURN(rc);
}
rc = mdc_prep_elc_req(exp, req, &cancels, count);
if (rc) {
- ldlm_lock_list_put(&cancels, l_bl_ast, count);
ptlrpc_request_free(req);
RETURN(rc);
}
rc = mdc_prep_elc_req(exp, req, &cancels, count);
if (rc) {
- ldlm_lock_list_put(&cancels, l_bl_ast, count);
ptlrpc_request_free(req);
RETURN(rc);
}
rc = mdc_prep_elc_req(exp, req, &cancels, count);
if (rc) {
- ldlm_lock_list_put(&cancels, l_bl_ast, count);
ptlrpc_request_free(req);
RETURN(rc);
}
rc = mdc_prep_elc_req(exp, req, &cancels, count);
if (rc) {
- ldlm_lock_list_put(&cancels, l_bl_ast, count);
ptlrpc_request_free(req);
RETURN(rc);
}
rc = ldlm_prep_elc_req(exp, req, LUSTRE_OST_VERSION, OST_DESTROY,
0, &cancels, count);
if (rc) {
- ldlm_lock_list_put(&cancels, l_bl_ast, count);
ptlrpc_request_free(req);
RETURN(rc);
}
}
EXPORT_SYMBOL(req_capsule_get_size);
+int req_capsule_msg_size(const struct req_capsule *pill, enum req_location loc)
+{
+ return lustre_msg_size(pill->rc_req->rq_import->imp_msg_magic,
+ pill->rc_fmt->rf_fields[loc].nr,
+ (int *)pill->rc_area[loc]);
+}
+
+int req_capsule_fmt_size(__u32 magic, const struct req_format *fmt,
+ enum req_location loc)
+{
+ int size, i = 0;
+
+ size = lustre_msg_hdr_size(magic, fmt->rf_fields[loc].nr);
+ if (size < 0)
+ return size;
+
+ if (magic == LUSTRE_MSG_MAGIC_V1)
+ i++;
+
+ for (; i < fmt->rf_fields[loc].nr; ++i)
+ if (fmt->rf_fields[loc].d[i]->rmf_size != -1)
+ size += size_round(fmt->rf_fields[loc].d[i]->rmf_size);
+ return size;
+}
+
#define FMT_FIELD(fmt, i, j) (fmt)->rf_fields[(i)].d[(j)]
void req_capsule_extend(struct req_capsule *pill, const struct req_format *fmt)
return size_round(offsetof(struct lustre_msg_v2, lm_buflens[count]));
}
+int lustre_msg_hdr_size(__u32 magic, int count)
+{
+ switch (magic) {
+ case LUSTRE_MSG_MAGIC_V1:
+ return lustre_msg_hdr_size_v1(count - 1);
+ case LUSTRE_MSG_MAGIC_V2:
+ return lustre_msg_hdr_size_v2(count);
+ default:
+ LASSERTF(0, "incorrect message magic: %08x\n", magic);
+ return -EINVAL;
+ }
+}
+EXPORT_SYMBOL(lustre_msg_hdr_size);
+
int lustre_msg_swabbed(struct lustre_msg *msg)
{
return (msg->lm_magic == LUSTRE_MSG_MAGIC_V1_SWABBED) ||