+ int size[2] = { sizeof(struct ptlrpc_body),
+ sizeof(struct ldlm_request) };
+ return ldlm_req_handles_avail(exp, size, 2, 0);
+}
+
+/* Cancel lru locks and pack them into the enqueue request. Pack there the given
+ * @count locks in @cancels. */
+struct ptlrpc_request *ldlm_prep_enqueue_req(struct obd_export *exp,
+ int bufcount, int *size,
+ struct list_head *cancels,
+ int count)
+{
+ struct ldlm_namespace *ns = exp->exp_obd->obd_namespace;
+ struct ldlm_request *dlm = NULL;
+ struct ptlrpc_request *req;
+ CFS_LIST_HEAD(head);
+ ENTRY;
+
+ if (cancels == NULL)
+ cancels = &head;
+ if (exp_connect_cancelset(exp)) {
+ /* Estimate the amount of available space in the request. */
+ int avail = ldlm_req_handles_avail(exp, size, bufcount,
+ LDLM_ENQUEUE_CANCEL_OFF);
+ LASSERT(avail >= count);
+
+ /* Cancel lru locks here _only_ if the server supports
+ * EARLY_CANCEL. Otherwise we have to send extra CANCEL
+ * rpc right on enqueue, what will make it slower, vs.
+ * asynchronous rpc in blocking thread. */
+ count += ldlm_cancel_lru_local(ns, cancels,
+ exp_connect_lru_resize(exp) ? 0 : 1,
+ avail - count, LDLM_CANCEL_AGED);
+ size[DLM_LOCKREQ_OFF] =
+ ldlm_request_bufsize(count, LDLM_ENQUEUE);
+ }
+ req = ptlrpc_prep_req(class_exp2cliimp(exp), LUSTRE_DLM_VERSION,
+ LDLM_ENQUEUE, bufcount, size, NULL);
+ if (exp_connect_cancelset(exp) && req) {
+ dlm = lustre_msg_buf(req->rq_reqmsg,
+ DLM_LOCKREQ_OFF, sizeof(*dlm));
+ /* Skip first lock handler in ldlm_request_pack(), this method
+ * will incrment @lock_count according to the lock handle amount
+ * actually written to the buffer. */
+ dlm->lock_count = LDLM_ENQUEUE_CANCEL_OFF;
+ ldlm_cli_cancel_list(cancels, count, req, DLM_LOCKREQ_OFF, 0);
+ } else {
+ ldlm_lock_list_put(cancels, l_bl_ast, count);
+ }
+ RETURN(req);
+}
+
+/* If a request has some specific initialisation it is passed in @reqp,
+ * otherwise it is created in ldlm_cli_enqueue.
+ *
+ * Supports sync and async requests, pass @async flag accordingly. If a
+ * request was created in ldlm_cli_enqueue and it is the async request,
+ * pass it to the caller in @reqp. */
+int ldlm_cli_enqueue(struct obd_export *exp, struct ptlrpc_request **reqp,
+ struct ldlm_enqueue_info *einfo,
+ const struct ldlm_res_id *res_id,
+ ldlm_policy_data_t *policy, int *flags,
+ void *lvb, __u32 lvb_len, void *lvb_swabber,
+ struct lustre_handle *lockh, int async)
+{
+ struct ldlm_namespace *ns = exp->exp_obd->obd_namespace;
+ struct ldlm_lock *lock;
+ struct ldlm_request *body;
+ struct ldlm_reply *reply;
+ int size[3] = { [MSG_PTLRPC_BODY_OFF] = sizeof(struct ptlrpc_body),
+ [DLM_LOCKREQ_OFF] = sizeof(*body),
+ [DLM_REPLY_REC_OFF] = lvb_len };
+ int is_replay = *flags & LDLM_FL_REPLAY;
+ int req_passed_in = 1, rc, err;
+ struct ptlrpc_request *req;
+ ENTRY;
+
+ LASSERT(exp != NULL);
+
+ /* If we're replaying this lock, just check some invariants.
+ * If we're creating a new lock, get everything all setup nice. */
+ if (is_replay) {
+ lock = ldlm_handle2lock(lockh);
+ LASSERT(lock != NULL);
+ LDLM_DEBUG(lock, "client-side enqueue START");
+ LASSERT(exp == lock->l_conn_export);
+ } else {
+ lock = ldlm_lock_create(ns, res_id, einfo->ei_type,
+ einfo->ei_mode, einfo->ei_cb_bl,
+ einfo->ei_cb_cp, einfo->ei_cb_gl,
+ einfo->ei_cbdata, lvb_len);
+ if (lock == NULL)
+ RETURN(-ENOMEM);
+ /* for the local lock, add the reference */
+ ldlm_lock_addref_internal(lock, einfo->ei_mode);
+ ldlm_lock2handle(lock, lockh);
+ lock->l_lvb_swabber = lvb_swabber;
+ if (policy != NULL) {
+ /* INODEBITS_INTEROP: If the server does not support
+ * inodebits, we will request a plain lock in the
+ * descriptor (ldlm_lock2desc() below) but use an
+ * inodebits lock internally with both bits set.
+ */
+ if (einfo->ei_type == LDLM_IBITS &&
+ !(exp->exp_connect_flags & OBD_CONNECT_IBITS))
+ lock->l_policy_data.l_inodebits.bits =
+ MDS_INODELOCK_LOOKUP |
+ MDS_INODELOCK_UPDATE;
+ else
+ lock->l_policy_data = *policy;
+ }
+
+ if (einfo->ei_type == LDLM_EXTENT)
+ lock->l_req_extent = policy->l_extent;
+ LDLM_DEBUG(lock, "client-side enqueue START");
+ }
+
+ /* lock not sent to server yet */
+
+ if (reqp == NULL || *reqp == NULL) {
+ req = ldlm_prep_enqueue_req(exp, 2, size, NULL, 0);
+ if (req == NULL) {
+ failed_lock_cleanup(ns, lock, lockh, einfo->ei_mode);
+ LDLM_LOCK_PUT(lock);
+ RETURN(-ENOMEM);
+ }
+ req_passed_in = 0;
+ if (reqp)
+ *reqp = req;
+ } else {
+ req = *reqp;
+ LASSERTF(lustre_msg_buflen(req->rq_reqmsg, DLM_LOCKREQ_OFF) >=
+ sizeof(*body), "buflen[%d] = %d, not "LPSZ"\n",
+ DLM_LOCKREQ_OFF,
+ lustre_msg_buflen(req->rq_reqmsg, DLM_LOCKREQ_OFF),
+ sizeof(*body));
+ }
+
+ lock->l_conn_export = exp;
+ lock->l_export = NULL;
+ lock->l_blocking_ast = einfo->ei_cb_bl;
+
+ /* Dump lock data into the request buffer */
+ body = lustre_msg_buf(req->rq_reqmsg, DLM_LOCKREQ_OFF, sizeof(*body));
+ ldlm_lock2desc(lock, &body->lock_desc);
+ body->lock_flags = *flags;
+ body->lock_handle[0] = *lockh;
+
+ /* Continue as normal. */
+ if (!req_passed_in) {
+ size[DLM_LOCKREPLY_OFF] = sizeof(*reply);
+ ptlrpc_req_set_repsize(req, 2 + (lvb_len > 0), size);
+ }
+
+ /*
+ * Liblustre client doesn't get extent locks, except for O_APPEND case
+ * where [0, OBD_OBJECT_EOF] lock is taken, or truncate, where
+ * [i_size, OBD_OBJECT_EOF] lock is taken.
+ */
+ LASSERT(ergo(LIBLUSTRE_CLIENT, einfo->ei_type != LDLM_EXTENT ||
+ policy->l_extent.end == OBD_OBJECT_EOF));
+
+ if (async) {
+ LASSERT(reqp != NULL);
+ RETURN(0);
+ }
+
+ LDLM_DEBUG(lock, "sending request");
+ rc = ptlrpc_queue_wait(req);
+ err = ldlm_cli_enqueue_fini(exp, req, einfo->ei_type, policy ? 1 : 0,
+ einfo->ei_mode, flags, lvb, lvb_len,
+ lvb_swabber, lockh, rc);
+
+ /* If ldlm_cli_enqueue_fini did not find the lock, we need to free
+ * one reference that we took */
+ if (err == -ENOLCK)
+ LDLM_LOCK_PUT(lock);
+ else
+ rc = err;
+
+ if (!req_passed_in && req != NULL) {
+ ptlrpc_req_finished(req);
+ if (reqp)
+ *reqp = NULL;
+ }
+
+ RETURN(rc);