+/* PAGE_SIZE-512 is to allow TCP/IP and LNET headers to fit into
+ * a single page on the send/receive side. XXX: 512 should be changed
+ * to more adequate value. */
+static inline int ldlm_req_handles_avail(struct obd_export *exp,
+ int *size, int bufcount,
+ int bufoff, int off)
+{
+ int avail = min_t(int, LDLM_MAXREQSIZE, PAGE_SIZE - 512);
+ int old_size = size[bufoff];
+
+ size[bufoff] = sizeof(struct ldlm_request);
+ avail -= lustre_msg_size(class_exp2cliimp(exp)->imp_msg_magic,
+ bufcount, size);
+ avail /= sizeof(struct lustre_handle);
+ avail += LDLM_LOCKREQ_HANDLES - off;
+ size[bufoff] = old_size;
+
+ return avail;
+}
+
+static inline int ldlm_cancel_handles_avail(struct obd_export *exp)
+{
+ int size[2] = { sizeof(struct ptlrpc_body),
+ sizeof(struct ldlm_request) };
+ return ldlm_req_handles_avail(exp, size, 2, DLM_LOCKREQ_OFF, 0);
+}
+
+/* Cancel lru locks and pack them into the enqueue request. Pack there the given
+ * @count locks in @cancels. */
+int ldlm_prep_elc_req(struct obd_export *exp, struct ptlrpc_request *req,
+ int version, int opc, int canceloff,
+ struct list_head *cancels, int count)
+{
+ struct ldlm_namespace *ns = exp->exp_obd->obd_namespace;
+ struct req_capsule *pill = &req->rq_pill;
+ struct ldlm_request *dlm = NULL;
+ int flags, avail, to_free, bufcount, pack = 0;
+ int rc;
+ ENTRY;
+
+
+ LASSERT(cancels != NULL);
+
+ if (exp_connect_cancelset(exp)) {
+ /* Estimate the amount of available space in the request. */
+ bufcount = req_capsule_filled_sizes(pill, RCL_CLIENT);
+ avail = ldlm_req_handles_avail(exp, pill->rc_area[RCL_CLIENT],
+ bufcount, bufcount - 1, canceloff);
+ flags = ns_connect_lru_resize(ns) ?
+ LDLM_CANCEL_LRUR : LDLM_CANCEL_AGED;
+ to_free = !ns_connect_lru_resize(ns) &&
+ opc == LDLM_ENQUEUE ? 1 : 0;
+
+ /* Cancel lru locks here _only_ if the server supports
+ * EARLY_CANCEL. Otherwise we have to send extra CANCEL
+ * rpc, what will make us slower. */
+ if (avail > count)
+ count += ldlm_cancel_lru_local(ns, cancels, to_free,
+ avail - count, 0, flags);
+ if (avail > count)
+ pack = count;
+ else
+ pack = avail;
+ req_capsule_set_size(&req->rq_pill, &RMF_DLM_REQ, RCL_CLIENT,
+ ldlm_request_bufsize(count, opc));
+ }
+
+ rc = ptlrpc_request_pack(req, version, opc);
+ if (rc) {
+ ldlm_lock_list_put(cancels, l_bl_ast, count);
+ RETURN(rc);
+ }
+
+ if (exp_connect_cancelset(exp)) {
+ if (canceloff) {
+ dlm = req_capsule_client_get(pill, &RMF_DLM_REQ);
+ LASSERT(dlm);
+ /* Skip first lock handler in ldlm_request_pack(),
+ * this method will incrment @lock_count according
+ * to the lock handle amount actually written to
+ * the buffer. */
+ dlm->lock_count = canceloff;
+ }
+ /* Pack into the request @pack lock handles. */
+ ldlm_cli_cancel_list(cancels, pack, req, 0);
+ /* Prepare and send separate cancel rpc for others. */
+ ldlm_cli_cancel_list(cancels, count - pack, NULL, 0);
+ } else {
+ ldlm_lock_list_put(cancels, l_bl_ast, count);
+ }
+ RETURN(0);
+}
+
+int ldlm_prep_enqueue_req(struct obd_export *exp,
+ struct ptlrpc_request *req,
+ struct list_head *cancels,
+ int count)
+{
+ return ldlm_prep_elc_req(exp, req, LUSTRE_DLM_VERSION, LDLM_ENQUEUE,
+ LDLM_ENQUEUE_CANCEL_OFF, cancels, count);
+}
+
+/* If a request has some specific initialisation it is passed in @reqp,
+ * otherwise it is created in ldlm_cli_enqueue.
+ *
+ * Supports sync and async requests, pass @async flag accordingly. If a
+ * request was created in ldlm_cli_enqueue and it is the async request,
+ * pass it to the caller in @reqp. */
+int ldlm_cli_enqueue(struct obd_export *exp, struct ptlrpc_request **reqp,
+ struct ldlm_enqueue_info *einfo,
+ const struct ldlm_res_id *res_id,
+ ldlm_policy_data_t *policy, int *flags,
+ void *lvb, __u32 lvb_len, void *lvb_swabber,
+ struct lustre_handle *lockh, int async)
+{
+ struct ldlm_namespace *ns = exp->exp_obd->obd_namespace;
+ struct ldlm_lock *lock;
+ struct ldlm_request *body;
+ int is_replay = *flags & LDLM_FL_REPLAY;
+ int req_passed_in = 1;
+ int rc, err;
+ struct ptlrpc_request *req;
+ ENTRY;
+
+ LASSERT(exp != NULL);
+
+ /* If we're replaying this lock, just check some invariants.
+ * If we're creating a new lock, get everything all setup nice. */
+ if (is_replay) {
+ lock = ldlm_handle2lock(lockh);
+ LASSERT(lock != NULL);
+ LDLM_DEBUG(lock, "client-side enqueue START");
+ LASSERT(exp == lock->l_conn_export);
+ } else {
+ lock = ldlm_lock_create(ns, res_id, einfo->ei_type,
+ einfo->ei_mode, einfo->ei_cb_bl,
+ einfo->ei_cb_cp, einfo->ei_cb_gl,
+ einfo->ei_cbdata, lvb_len);
+ if (lock == NULL)
+ RETURN(-ENOMEM);
+ /* for the local lock, add the reference */
+ ldlm_lock_addref_internal(lock, einfo->ei_mode);
+ ldlm_lock2handle(lock, lockh);
+ lock->l_lvb_swabber = lvb_swabber;
+ if (policy != NULL) {
+ /* INODEBITS_INTEROP: If the server does not support
+ * inodebits, we will request a plain lock in the
+ * descriptor (ldlm_lock2desc() below) but use an
+ * inodebits lock internally with both bits set.
+ */
+ if (einfo->ei_type == LDLM_IBITS &&
+ !(exp->exp_connect_flags & OBD_CONNECT_IBITS))
+ lock->l_policy_data.l_inodebits.bits =
+ MDS_INODELOCK_LOOKUP |
+ MDS_INODELOCK_UPDATE;
+ else
+ lock->l_policy_data = *policy;
+ }
+
+ if (einfo->ei_type == LDLM_EXTENT)
+ lock->l_req_extent = policy->l_extent;
+ LDLM_DEBUG(lock, "client-side enqueue START");
+ }
+
+ /* lock not sent to server yet */
+
+ if (reqp == NULL || *reqp == NULL) {
+ req = ptlrpc_request_alloc_pack(class_exp2cliimp(exp),
+ &RQF_LDLM_ENQUEUE,
+ LUSTRE_DLM_VERSION,
+ LDLM_ENQUEUE);
+ if (req == NULL) {
+ failed_lock_cleanup(ns, lock, lockh, einfo->ei_mode);
+ LDLM_LOCK_PUT(lock);
+ RETURN(-ENOMEM);
+ }
+ req_passed_in = 0;
+ if (reqp)
+ *reqp = req;
+ } else {
+ int len;
+
+ req = *reqp;
+ len = req_capsule_get_size(&req->rq_pill, &RMF_DLM_REQ,
+ RCL_CLIENT);
+ LASSERTF(len >= sizeof(*body), "buflen[%d] = %d, not %d\n",
+ DLM_LOCKREQ_OFF, len, sizeof(*body));
+ }
+
+ lock->l_conn_export = exp;
+ lock->l_export = NULL;
+ lock->l_blocking_ast = einfo->ei_cb_bl;
+
+ /* Dump lock data into the request buffer */
+ body = req_capsule_client_get(&req->rq_pill, &RMF_DLM_REQ);
+ ldlm_lock2desc(lock, &body->lock_desc);
+ body->lock_flags = *flags;
+ body->lock_handle[0] = *lockh;
+
+ /* Continue as normal. */
+ if (!req_passed_in) {
+ if (lvb_len > 0) {
+ req_capsule_extend(&req->rq_pill,
+ &RQF_LDLM_ENQUEUE_LVB);
+ req_capsule_set_size(&req->rq_pill, &RMF_DLM_LVB,
+ RCL_SERVER, lvb_len);
+ }
+ ptlrpc_request_set_replen(req);
+ }
+
+ /*
+ * Liblustre client doesn't get extent locks, except for O_APPEND case
+ * where [0, OBD_OBJECT_EOF] lock is taken, or truncate, where
+ * [i_size, OBD_OBJECT_EOF] lock is taken.
+ */
+ LASSERT(ergo(LIBLUSTRE_CLIENT, einfo->ei_type != LDLM_EXTENT ||
+ policy->l_extent.end == OBD_OBJECT_EOF));
+
+ if (async) {
+ LASSERT(reqp != NULL);
+ RETURN(0);
+ }
+
+ LDLM_DEBUG(lock, "sending request");
+ rc = ptlrpc_queue_wait(req);
+ err = ldlm_cli_enqueue_fini(exp, req, einfo->ei_type, policy ? 1 : 0,
+ einfo->ei_mode, flags, lvb, lvb_len,
+ lvb_swabber, lockh, rc);
+
+ /* If ldlm_cli_enqueue_fini did not find the lock, we need to free
+ * one reference that we took */
+ if (err == -ENOLCK)
+ LDLM_LOCK_PUT(lock);
+ else
+ rc = err;
+
+ if (!req_passed_in && req != NULL) {
+ ptlrpc_req_finished(req);
+ if (reqp)
+ *reqp = NULL;
+ }
+
+ RETURN(rc);
+}
+