+ LDLM_DEBUG(lock, "client-side local convert");
+
+ res = ldlm_lock_convert(lock, new_mode, flags);
+ if (res) {
+ ldlm_reprocess_all(res);
+ rc = 0;
+ } else {
+ rc = EDEADLOCK;
+ }
+ LDLM_DEBUG(lock, "client-side local convert handler END");
+ LDLM_LOCK_PUT(lock);
+ RETURN(rc);
+}
+
+/* FIXME: one of ldlm_cli_convert or the server side should reject attempted
+ * conversion of locks which are on the waiting or converting queue */
+/* Caller of this code is supposed to take care of lock readers/writers
+ accounting */
+int ldlm_cli_convert(struct lustre_handle *lockh, int new_mode, __u32 *flags)
+{
+ struct ldlm_request *body;
+ struct ldlm_reply *reply;
+ struct ldlm_lock *lock;
+ struct ldlm_resource *res;
+ struct ptlrpc_request *req;
+ int rc;
+ ENTRY;
+
+ lock = ldlm_handle2lock(lockh);
+ if (!lock) {
+ LBUG();
+ RETURN(-EINVAL);
+ }
+ *flags = 0;
+
+ if (lock->l_conn_export == NULL)
+ RETURN(ldlm_cli_convert_local(lock, new_mode, flags));
+
+ LDLM_DEBUG(lock, "client-side convert");
+
+ req = ptlrpc_request_alloc_pack(class_exp2cliimp(lock->l_conn_export),
+ &RQF_LDLM_CONVERT, LUSTRE_DLM_VERSION,
+ LDLM_CONVERT);
+ if (req == NULL) {
+ LDLM_LOCK_PUT(lock);
+ RETURN(-ENOMEM);
+ }
+
+ body = req_capsule_client_get(&req->rq_pill, &RMF_DLM_REQ);
+ body->lock_handle[0] = lock->l_remote_handle;
+
+ body->lock_desc.l_req_mode = new_mode;
+ body->lock_flags = *flags;
+
+
+ ptlrpc_request_set_replen(req);
+ rc = ptlrpc_queue_wait(req);
+ if (rc != ELDLM_OK)
+ GOTO(out, rc);
+
+ reply = req_capsule_server_get(&req->rq_pill, &RMF_DLM_REP);
+ if (reply == NULL)
+ GOTO(out, rc = -EPROTO);
+
+ if (req->rq_status)
+ GOTO(out, rc = req->rq_status);
+
+ res = ldlm_lock_convert(lock, new_mode, &reply->lock_flags);
+ if (res != NULL) {
+ ldlm_reprocess_all(res);
+ /* Go to sleep until the lock is granted. */
+ /* FIXME: or cancelled. */
+ if (lock->l_completion_ast) {
+ rc = lock->l_completion_ast(lock, LDLM_FL_WAIT_NOREPROC,
+ NULL);
+ if (rc)
+ GOTO(out, rc);
+ }
+ } else {
+ rc = EDEADLOCK;
+ }
+ EXIT;
+ out:
+ LDLM_LOCK_PUT(lock);
+ ptlrpc_req_finished(req);
+ return rc;
+}
+
+/* Cancel locks locally.
+ * Returns:
+ * LDLM_FL_LOCAL_ONLY if tere is no need in a CANCEL rpc to the server;
+ * LDLM_FL_CANCELING otherwise;
+ * LDLM_FL_BL_AST if there is a need in a separate CANCEL rpc. */
+static int ldlm_cli_cancel_local(struct ldlm_lock *lock)
+{
+ int rc = LDLM_FL_LOCAL_ONLY;
+ ENTRY;
+
+ if (lock->l_conn_export) {
+ int local_only;
+
+ LDLM_DEBUG(lock, "client-side cancel");
+ /* Set this flag to prevent others from getting new references*/
+ lock_res_and_lock(lock);
+ lock->l_flags |= LDLM_FL_CBPENDING;
+ local_only = (lock->l_flags &
+ (LDLM_FL_LOCAL_ONLY|LDLM_FL_CANCEL_ON_BLOCK));
+ ldlm_cancel_callback(lock);
+ rc = (lock->l_flags & LDLM_FL_BL_AST) ?
+ LDLM_FL_BL_AST : LDLM_FL_CANCELING;
+ unlock_res_and_lock(lock);
+
+ if (local_only) {
+ CDEBUG(D_DLMTRACE, "not sending request (at caller's "
+ "instruction)\n");
+ rc = LDLM_FL_LOCAL_ONLY;
+ }
+ ldlm_lock_cancel(lock);
+ } else {
+ if (ns_is_client(lock->l_resource->lr_namespace)) {
+ LDLM_ERROR(lock, "Trying to cancel local lock");
+ LBUG();
+ }
+ LDLM_DEBUG(lock, "server-side local cancel");
+ ldlm_lock_cancel(lock);
+ ldlm_reprocess_all(lock->l_resource);
+ LDLM_DEBUG(lock, "server-side local cancel handler END");
+ }
+
+ RETURN(rc);
+}
+
+/* Pack @count locks in @head into ldlm_request buffer at the offset @off,
+ of the request @req. */
+static void ldlm_cancel_pack(struct ptlrpc_request *req,
+ struct list_head *head, int count)
+{
+ struct ldlm_request *dlm;
+ struct ldlm_lock *lock;
+ int max, packed = 0;
+ ENTRY;
+
+ dlm = req_capsule_client_get(&req->rq_pill, &RMF_DLM_REQ);
+ LASSERT(dlm != NULL);
+
+ /* Check the room in the request buffer. */
+ max = req_capsule_get_size(&req->rq_pill, &RMF_DLM_REQ, RCL_CLIENT) -
+ sizeof(struct ldlm_request);
+ max /= sizeof(struct lustre_handle);
+ max += LDLM_LOCKREQ_HANDLES;
+ LASSERT(max >= dlm->lock_count + count);
+
+ /* XXX: it would be better to pack lock handles grouped by resource.
+ * so that the server cancel would call filter_lvbo_update() less
+ * frequently. */
+ list_for_each_entry(lock, head, l_bl_ast) {
+ if (!count--)
+ break;
+ LASSERT(lock->l_conn_export);
+ /* Pack the lock handle to the given request buffer. */
+ LDLM_DEBUG(lock, "packing");
+ dlm->lock_handle[dlm->lock_count++] = lock->l_remote_handle;
+ packed++;
+ }
+ CDEBUG(D_DLMTRACE, "%d locks packed\n", packed);
+ EXIT;
+}
+
+/* Prepare and send a batched cancel rpc, it will include count lock handles
+ * of locks given in @head. */
+int ldlm_cli_cancel_req(struct obd_export *exp, struct list_head *cancels,
+ int count, int flags)
+{
+ struct ptlrpc_request *req = NULL;
+ struct obd_import *imp;
+ int free, sent = 0;
+ int rc = 0;
+ ENTRY;
+
+ LASSERT(exp != NULL);
+ LASSERT(count > 0);
+
+ OBD_FAIL_TIMEOUT(OBD_FAIL_LDLM_PAUSE_CANCEL, obd_fail_val);
+
+ if (OBD_FAIL_CHECK(OBD_FAIL_LDLM_CANCEL_RACE))
+ RETURN(count);
+
+ free = ldlm_format_handles_avail(class_exp2cliimp(exp),
+ &RQF_LDLM_CANCEL, RCL_CLIENT, 0);
+ if (count > free)
+ count = free;
+
+ while (1) {
+ int bufcount;
+
+ imp = class_exp2cliimp(exp);
+ if (imp == NULL || imp->imp_invalid) {
+ CDEBUG(D_DLMTRACE,
+ "skipping cancel on invalid import %p\n", imp);
+ RETURN(count);
+ }
+
+ req = ptlrpc_request_alloc(imp, &RQF_LDLM_CANCEL);
+ if (req == NULL)
+ GOTO(out, rc = -ENOMEM);
+
+ bufcount = req_capsule_filled_sizes(&req->rq_pill, RCL_CLIENT);
+ req_capsule_set_size(&req->rq_pill, &RMF_DLM_REQ, RCL_CLIENT,
+ ldlm_request_bufsize(count, LDLM_CANCEL));
+
+ rc = ptlrpc_request_pack(req, LUSTRE_DLM_VERSION, LDLM_CANCEL);
+ if (rc) {
+ ptlrpc_request_free(req);
+ GOTO(out, rc);
+ }
+ req->rq_no_resend = 1;
+ req->rq_no_delay = 1;
+
+ req->rq_request_portal = LDLM_CANCEL_REQUEST_PORTAL;
+ req->rq_reply_portal = LDLM_CANCEL_REPLY_PORTAL;
+ ptlrpc_at_set_req_timeout(req);
+
+ ldlm_cancel_pack(req, cancels, count);
+
+ ptlrpc_request_set_replen(req);
+ if (flags & LDLM_FL_ASYNC) {
+ ptlrpcd_add_req(req, PSCOPE_OTHER);
+ sent = count;
+ GOTO(out, 0);
+ } else {
+ rc = ptlrpc_queue_wait(req);
+ }
+ if (rc == ESTALE) {
+ CDEBUG(D_DLMTRACE, "client/server (nid %s) "
+ "out of sync -- not fatal\n",
+ libcfs_nid2str(req->rq_import->
+ imp_connection->c_peer.nid));
+ rc = 0;
+ } else if (rc == -ETIMEDOUT && /* check there was no reconnect*/
+ req->rq_import_generation == imp->imp_generation) {
+ ptlrpc_req_finished(req);
+ continue;
+ } else if (rc != ELDLM_OK) {
+ CERROR("Got rc %d from cancel RPC: canceling "
+ "anyway\n", rc);
+ break;
+ }
+ sent = count;
+ break;
+ }
+
+ ptlrpc_req_finished(req);
+ EXIT;
+out:
+ return sent ? sent : rc;
+}
+
+static inline struct ldlm_pool *ldlm_imp2pl(struct obd_import *imp)
+{
+ LASSERT(imp != NULL);
+ return &imp->imp_obd->obd_namespace->ns_pool;
+}
+
+/**
+ * Update client's obd pool related fields with new SLV and Limit from \a req.
+ */
+int ldlm_cli_update_pool(struct ptlrpc_request *req)
+{
+ struct obd_device *obd;
+ __u64 old_slv, new_slv;
+ __u32 new_limit;
+ ENTRY;
+ if (unlikely(!req->rq_import || !req->rq_import->imp_obd ||
+ !imp_connect_lru_resize(req->rq_import)))
+ {
+ /*
+ * Do nothing for corner cases.
+ */
+ RETURN(0);
+ }
+
+ /*
+ * In some cases RPC may contain slv and limit zeroed out. This is
+ * the case when server does not support lru resize feature. This is
+ * also possible in some recovery cases when server side reqs have no
+ * ref to obd export and thus access to server side namespace is no
+ * possible.
+ */
+ if (lustre_msg_get_slv(req->rq_repmsg) == 0 ||
+ lustre_msg_get_limit(req->rq_repmsg) == 0) {
+ DEBUG_REQ(D_HA, req, "Zero SLV or Limit found "
+ "(SLV: "LPU64", Limit: %u)",
+ lustre_msg_get_slv(req->rq_repmsg),
+ lustre_msg_get_limit(req->rq_repmsg));
+ RETURN(0);
+ }