* a single page on the send/receive side. XXX: 512 should be changed
* to more adequate value. */
static inline int ldlm_req_handles_avail(struct obd_export *exp,
- int *size, int bufcount, int off)
+ int *size, int bufcount,
+ int bufoff, int off)
{
int avail = min_t(int, LDLM_MAXREQSIZE, PAGE_SIZE - 512);
- int old_size = size[DLM_LOCKREQ_OFF];
+ int old_size = size[bufoff];
- size[DLM_LOCKREQ_OFF] = sizeof(struct ldlm_request);
+ size[bufoff] = sizeof(struct ldlm_request);
avail -= lustre_msg_size(class_exp2cliimp(exp)->imp_msg_magic,
bufcount, size);
avail /= sizeof(struct lustre_handle);
avail += LDLM_LOCKREQ_HANDLES - off;
- size[DLM_LOCKREQ_OFF] = old_size;
+ size[bufoff] = old_size;
return avail;
}
{
int size[2] = { sizeof(struct ptlrpc_body),
sizeof(struct ldlm_request) };
- return ldlm_req_handles_avail(exp, size, 2, 0);
+ return ldlm_req_handles_avail(exp, size, 2, DLM_LOCKREQ_OFF, 0);
}
/* Cancel lru locks and pack them into the enqueue request. Pack there the given
* @count locks in @cancels. */
-struct ptlrpc_request *ldlm_prep_enqueue_req(struct obd_export *exp,
- int bufcount, int *size,
- struct list_head *cancels,
- int count)
+struct ptlrpc_request *ldlm_prep_elc_req(struct obd_export *exp, int version,
+ int opc, int bufcount, int *size,
+ int bufoff, int canceloff,
+ struct list_head *cancels, int count)
{
struct ldlm_namespace *ns = exp->exp_obd->obd_namespace;
+ int flags, avail, to_free, pack = 0;
struct ldlm_request *dlm = NULL;
struct ptlrpc_request *req;
CFS_LIST_HEAD(head);
cancels = &head;
if (exp_connect_cancelset(exp)) {
/* Estimate the amount of available space in the request. */
- int avail = ldlm_req_handles_avail(exp, size, bufcount,
- LDLM_ENQUEUE_CANCEL_OFF);
- int flags, cancel;
-
- LASSERT(avail >= count);
-
+ avail = ldlm_req_handles_avail(exp, size, bufcount,
+ bufoff, canceloff);
flags = ns_connect_lru_resize(ns) ?
LDLM_CANCEL_LRUR : LDLM_CANCEL_AGED;
- cancel = ns_connect_lru_resize(ns) ? 0 : 1;
+ to_free = !ns_connect_lru_resize(ns) &&
+ opc == LDLM_ENQUEUE ? 1 : 0;
/* Cancel lru locks here _only_ if the server supports
* EARLY_CANCEL. Otherwise we have to send extra CANCEL
- * rpc right on enqueue, what will make it slower, vs.
- * asynchronous rpc in blocking thread. */
- count += ldlm_cancel_lru_local(ns, cancels, cancel,
- avail - count, 0, flags);
- size[DLM_LOCKREQ_OFF] =
- ldlm_request_bufsize(count, LDLM_ENQUEUE);
- }
- req = ptlrpc_prep_req(class_exp2cliimp(exp), LUSTRE_DLM_VERSION,
- LDLM_ENQUEUE, bufcount, size, NULL);
+ * rpc, what will make us slower. */
+ if (avail > count)
+ count += ldlm_cancel_lru_local(ns, cancels, to_free,
+ avail - count, 0, flags);
+ if (avail > count)
+ pack = count;
+ else
+ pack = avail;
+ size[bufoff] = ldlm_request_bufsize(pack, opc);
+ }
+ req = ptlrpc_prep_req(class_exp2cliimp(exp), version,
+ opc, bufcount, size, NULL);
if (exp_connect_cancelset(exp) && req) {
- dlm = lustre_msg_buf(req->rq_reqmsg,
- DLM_LOCKREQ_OFF, sizeof(*dlm));
- /* Skip first lock handler in ldlm_request_pack(), this method
- * will incrment @lock_count according to the lock handle amount
- * actually written to the buffer. */
- dlm->lock_count = LDLM_ENQUEUE_CANCEL_OFF;
- ldlm_cli_cancel_list(cancels, count, req, DLM_LOCKREQ_OFF, 0);
+ if (canceloff) {
+ dlm = lustre_msg_buf(req->rq_reqmsg, bufoff,
+ sizeof(*dlm));
+ /* Skip first lock handler in ldlm_request_pack(),
+ * this method will incrment @lock_count according
+ * to the lock handle amount actually written to
+ * the buffer. */
+ dlm->lock_count = canceloff;
+ }
+ /* Pack into the request @pack lock handles. */
+ ldlm_cli_cancel_list(cancels, pack, req, bufoff, 0);
+ /* Prepare and send separate cancel rpc for others. */
+ ldlm_cli_cancel_list(cancels, count - pack, NULL, 0, 0);
} else {
ldlm_lock_list_put(cancels, l_bl_ast, count);
}
RETURN(req);
}
+struct ptlrpc_request *ldlm_prep_enqueue_req(struct obd_export *exp,
+ int bufcount, int *size,
+ struct list_head *cancels,
+ int count)
+{
+ return ldlm_prep_elc_req(exp, LUSTRE_DLM_VERSION, LDLM_ENQUEUE,
+ bufcount, size, DLM_LOCKREQ_OFF,
+ LDLM_ENQUEUE_CANCEL_OFF, cancels, count);
+}
+
/* If a request has some specific initialisation it is passed in @reqp,
* otherwise it is created in ldlm_cli_enqueue.
*
if (OBD_FAIL_CHECK(OBD_FAIL_LDLM_CANCEL_RACE))
RETURN(count);
- free = ldlm_req_handles_avail(exp, size, 2, 0);
+ free = ldlm_req_handles_avail(exp, size, 2, DLM_LOCKREQ_OFF, 0);
if (count > free)
count = free;
* here and send them all as one LDLM_CANCEL rpc. */
LASSERT(list_empty(&lock->l_bl_ast));
list_add(&lock->l_bl_ast, &cancels);
- avail = ldlm_cancel_handles_avail(lock->l_conn_export);
- LASSERT(avail > 0);
+ if (exp_connect_cancelset(lock->l_conn_export)) {
+ avail = ldlm_cancel_handles_avail(lock->l_conn_export);
+ LASSERT(avail > 0);
- ns = lock->l_resource->lr_namespace;
- flags = ns_connect_lru_resize(ns) ? LDLM_CANCEL_LRUR : LDLM_CANCEL_AGED;
- count += ldlm_cancel_lru_local(ns, &cancels, 0, avail - count,
- LDLM_FL_BL_AST, flags);
+ ns = lock->l_resource->lr_namespace;
+ flags = ns_connect_lru_resize(ns) ?
+ LDLM_CANCEL_LRUR : LDLM_CANCEL_AGED;
+ count += ldlm_cancel_lru_local(ns, &cancels, 0, avail - 1,
+ LDLM_FL_BL_AST, flags);
+ }
ldlm_cli_cancel_list(&cancels, count, NULL, 0, 0);
RETURN(0);
}
count -= res;
ldlm_lock_list_put(cancels, l_bl_ast, res);
}
- LASSERT(list_empty(cancels));
LASSERT(count == 0);
RETURN(0);
}
RETURN(count);
}
+struct ptlrpc_request *mdc_prep_elc_req(struct obd_export *exp,
+ int bufcount, int *size, int off,
+ struct list_head *cancels, int count)
+{
+ return ldlm_prep_elc_req(exp, LUSTRE_MDS_VERSION, MDS_REINT,
+ bufcount, size, off, 0, cancels, count);
+}
+
/* If mdc_setattr is called with an 'iattr', then it is a normal RPC that
* should take the normal semaphore and go to the normal portal.
*
(fid_is_sane(&op_data->op_fid1)))
count = mdc_resource_get_unused(exp, &op_data->op_fid1,
&cancels, LCK_EX, bits);
- if (exp_connect_cancelset(exp) && count) {
+ if (exp_connect_cancelset(exp) && count)
bufcount = 7;
- size[REQ_REC_OFF + 5] = ldlm_request_bufsize(count, MDS_REINT);
- }
- req = ptlrpc_prep_req(class_exp2cliimp(exp), LUSTRE_MDS_VERSION,
- MDS_REINT, bufcount, size, NULL);
- if (exp_connect_cancelset(exp) && req)
- ldlm_cli_cancel_list(&cancels, count, req, REQ_REC_OFF + 5, 0);
- else
- ldlm_lock_list_put(&cancels, l_bl_ast, count);
+ req = mdc_prep_elc_req(exp, bufcount, size,
+ REQ_REC_OFF + 5, &cancels, count);
if (req == NULL)
RETURN(-ENOMEM);
count = mdc_resource_get_unused(exp, &op_data->op_fid1,
&cancels, LCK_EX,
MDS_INODELOCK_UPDATE);
- if (exp_connect_cancelset(exp) && count) {
+ if (exp_connect_cancelset(exp) && count)
bufcount = 6;
- size[REQ_REC_OFF + 4] = ldlm_request_bufsize(count, MDS_REINT);
- }
- req = ptlrpc_prep_req(class_exp2cliimp(exp), LUSTRE_MDS_VERSION,
- MDS_REINT, bufcount, size, NULL);
- if (exp_connect_cancelset(exp) && req)
- ldlm_cli_cancel_list(&cancels, count, req, REQ_REC_OFF + 4, 0);
- else
- ldlm_lock_list_put(&cancels, l_bl_ast, count);
+ req = mdc_prep_elc_req(exp, bufcount, size,
+ REQ_REC_OFF + 4, &cancels, count);
if (req == NULL)
RETURN(-ENOMEM);
count += mdc_resource_get_unused(exp, &op_data->op_fid3,
&cancels, LCK_EX,
MDS_INODELOCK_FULL);
- if (exp_connect_cancelset(exp) && count) {
+ if (exp_connect_cancelset(exp) && count)
bufcount = 5;
- size[REQ_REC_OFF + 3] = ldlm_request_bufsize(count, MDS_REINT);
- }
- req = ptlrpc_prep_req(class_exp2cliimp(exp), LUSTRE_MDS_VERSION,
- MDS_REINT, bufcount, size, NULL);
- if (exp_connect_cancelset(exp) && req)
- ldlm_cli_cancel_list(&cancels, count, req, REQ_REC_OFF + 3, 0);
- else
- ldlm_lock_list_put(&cancels, l_bl_ast, count);
+
+ req = mdc_prep_elc_req(exp, bufcount, size,
+ REQ_REC_OFF + 3, &cancels, count);
if (req == NULL)
RETURN(-ENOMEM);
count += mdc_resource_get_unused(exp, &op_data->op_fid1,
&cancels, LCK_EX,
MDS_INODELOCK_UPDATE);
- if (exp_connect_cancelset(exp) && count) {
+ if (exp_connect_cancelset(exp) && count)
bufcount = 6;
- size[REQ_REC_OFF + 4] = ldlm_request_bufsize(count, MDS_REINT);
- }
-
- req = ptlrpc_prep_req(class_exp2cliimp(exp), LUSTRE_MDS_VERSION,
- MDS_REINT, bufcount, size, NULL);
- if (exp_connect_cancelset(exp) && req)
- ldlm_cli_cancel_list(&cancels, count, req, REQ_REC_OFF + 4, 0);
- else
- ldlm_lock_list_put(&cancels, l_bl_ast, count);
+ req = mdc_prep_elc_req(exp, bufcount, size,
+ REQ_REC_OFF + 4, &cancels, count);
if (req == NULL)
RETURN(-ENOMEM);
count += mdc_resource_get_unused(exp, &op_data->op_fid4,
&cancels, LCK_EX,
MDS_INODELOCK_FULL);
- if (exp_connect_cancelset(exp) && count) {
+ if (exp_connect_cancelset(exp) && count)
bufcount = 7;
- size[REQ_REC_OFF + 5] = ldlm_request_bufsize(count, MDS_REINT);
- }
- req = ptlrpc_prep_req(class_exp2cliimp(exp), LUSTRE_MDS_VERSION,
- MDS_REINT, bufcount, size, NULL);
- if (exp_connect_cancelset(exp) && req)
- ldlm_cli_cancel_list(&cancels, count, req, REQ_REC_OFF + 5, 0);
- else
- ldlm_lock_list_put(&cancels, l_bl_ast, count);
+ req = mdc_prep_elc_req(exp, bufcount, size,
+ REQ_REC_OFF + 5, &cancels, count);
if (req == NULL)
RETURN(-ENOMEM);