4 * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
6 * This program is free software; you can redistribute it and/or modify
7 * it under the terms of the GNU General Public License version 2 only,
8 * as published by the Free Software Foundation.
10 * This program is distributed in the hope that it will be useful, but
11 * WITHOUT ANY WARRANTY; without even the implied warranty of
12 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
13 * General Public License version 2 for more details (a copy is included
14 * in the LICENSE file that accompanied this code).
16 * You should have received a copy of the GNU General Public License
17 * version 2 along with this program; If not, see
18 * http://www.gnu.org/licenses/gpl-2.0.html
23 * Copyright (c) 2003, 2010, Oracle and/or its affiliates. All rights reserved.
24 * Use is subject to license terms.
26 * Copyright (c) 2011, 2016, Intel Corporation.
29 * This file is part of Lustre, http://www.lustre.org/
30 * Lustre is a trademark of Sun Microsystems, Inc.
33 #define DEBUG_SUBSYSTEM S_MDC
35 #include <linux/module.h>
38 #include <obd_class.h>
39 #include <lustre_dlm.h>
40 #include <lustre_fid.h>
41 #include <lustre_intent.h>
42 #include <lustre_mdc.h>
43 #include <lustre_net.h>
44 #include <lustre_req_layout.h>
45 #include <lustre_swab.h>
47 #include "mdc_internal.h"
49 struct mdc_getattr_args {
50 struct obd_export *ga_exp;
51 struct md_enqueue_info *ga_minfo;
54 int it_open_error(int phase, struct lookup_intent *it)
56 if (it_disposition(it, DISP_OPEN_LEASE)) {
57 if (phase >= DISP_OPEN_LEASE)
62 if (it_disposition(it, DISP_OPEN_OPEN)) {
63 if (phase >= DISP_OPEN_OPEN)
69 if (it_disposition(it, DISP_OPEN_CREATE)) {
70 if (phase >= DISP_OPEN_CREATE)
76 if (it_disposition(it, DISP_LOOKUP_EXECD)) {
77 if (phase >= DISP_LOOKUP_EXECD)
83 if (it_disposition(it, DISP_IT_EXECD)) {
84 if (phase >= DISP_IT_EXECD)
90 CERROR("it disp: %X, status: %d\n", it->it_disposition, it->it_status);
95 EXPORT_SYMBOL(it_open_error);
97 /* this must be called on a lockh that is known to have a referenced lock */
98 int mdc_set_lock_data(struct obd_export *exp, const struct lustre_handle *lockh,
99 void *data, __u64 *bits)
101 struct ldlm_lock *lock;
102 struct inode *new_inode = data;
108 if (!lustre_handle_is_used(lockh))
111 lock = ldlm_handle2lock(lockh);
113 LASSERT(lock != NULL);
114 lock_res_and_lock(lock);
115 if (lock->l_resource->lr_lvb_inode &&
116 lock->l_resource->lr_lvb_inode != data) {
117 struct inode *old_inode = lock->l_resource->lr_lvb_inode;
118 LASSERTF(old_inode->i_state & I_FREEING,
119 "Found existing inode %p/%lu/%u state %lu in lock: "
120 "setting data to %p/%lu/%u\n", old_inode,
121 old_inode->i_ino, old_inode->i_generation,
123 new_inode, new_inode->i_ino, new_inode->i_generation);
125 lock->l_resource->lr_lvb_inode = new_inode;
127 *bits = lock->l_policy_data.l_inodebits.bits;
129 unlock_res_and_lock(lock);
135 enum ldlm_mode mdc_lock_match(struct obd_export *exp, __u64 flags,
136 const struct lu_fid *fid, enum ldlm_type type,
137 union ldlm_policy_data *policy,
138 enum ldlm_mode mode, struct lustre_handle *lockh)
140 struct ldlm_res_id res_id;
144 fid_build_reg_res_name(fid, &res_id);
145 /* LU-4405: Clear bits not supported by server */
146 policy->l_inodebits.bits &= exp_connect_ibits(exp);
147 rc = ldlm_lock_match(class_exp2obd(exp)->obd_namespace, flags,
148 &res_id, type, policy, mode, lockh, 0);
152 int mdc_cancel_unused(struct obd_export *exp, const struct lu_fid *fid,
153 union ldlm_policy_data *policy, enum ldlm_mode mode,
154 enum ldlm_cancel_flags flags, void *opaque)
156 struct obd_device *obd = class_exp2obd(exp);
157 struct ldlm_res_id res_id;
162 fid_build_reg_res_name(fid, &res_id);
163 rc = ldlm_cli_cancel_unused_resource(obd->obd_namespace, &res_id,
164 policy, mode, flags, opaque);
168 int mdc_null_inode(struct obd_export *exp,
169 const struct lu_fid *fid)
171 struct ldlm_res_id res_id;
172 struct ldlm_resource *res;
173 struct ldlm_namespace *ns = class_exp2obd(exp)->obd_namespace;
176 LASSERTF(ns != NULL, "no namespace passed\n");
178 fid_build_reg_res_name(fid, &res_id);
180 res = ldlm_resource_get(ns, NULL, &res_id, 0, 0);
185 res->lr_lvb_inode = NULL;
188 ldlm_resource_putref(res);
192 static inline void mdc_clear_replay_flag(struct ptlrpc_request *req, int rc)
194 /* Don't hold error requests for replay. */
195 if (req->rq_replay) {
196 spin_lock(&req->rq_lock);
198 spin_unlock(&req->rq_lock);
200 if (rc && req->rq_transno != 0) {
201 DEBUG_REQ(D_ERROR, req, "transno returned on error rc %d", rc);
206 /* Save a large LOV EA into the request buffer so that it is available
207 * for replay. We don't do this in the initial request because the
208 * original request doesn't need this buffer (at most it sends just the
209 * lov_mds_md) and it is a waste of RAM/bandwidth to send the empty
210 * buffer and may also be difficult to allocate and save a very large
211 * request buffer for each open. (bug 5707)
213 * OOM here may cause recovery failure if lmm is needed (only for the
214 * original open if the MDS crashed just when this client also OOM'd)
215 * but this is incredibly unlikely, and questionable whether the client
216 * could do MDS recovery under OOM anyways... */
217 int mdc_save_lovea(struct ptlrpc_request *req,
218 const struct req_msg_field *field,
219 void *data, u32 size)
221 struct req_capsule *pill = &req->rq_pill;
225 if (req_capsule_get_size(pill, field, RCL_CLIENT) < size) {
226 rc = sptlrpc_cli_enlarge_reqbuf(req, field, size);
228 CERROR("%s: Can't enlarge ea size to %d: rc = %d\n",
229 req->rq_export->exp_obd->obd_name,
234 req_capsule_shrink(pill, field, size, RCL_CLIENT);
237 req_capsule_set_size(pill, field, RCL_CLIENT, size);
238 lmm = req_capsule_client_get(pill, field);
240 memcpy(lmm, data, size);
245 static struct ptlrpc_request *
246 mdc_intent_open_pack(struct obd_export *exp, struct lookup_intent *it,
247 struct md_op_data *op_data)
249 struct ptlrpc_request *req;
250 struct obd_device *obddev = class_exp2obd(exp);
251 struct ldlm_intent *lit;
252 const void *lmm = op_data->op_data;
253 __u32 lmmsize = op_data->op_data_size;
254 struct list_head cancels = LIST_HEAD_INIT(cancels);
260 it->it_create_mode = (it->it_create_mode & ~S_IFMT) | S_IFREG;
262 /* XXX: openlock is not cancelled for cross-refs. */
263 /* If inode is known, cancel conflicting OPEN locks. */
264 if (fid_is_sane(&op_data->op_fid2)) {
265 if (it->it_flags & MDS_OPEN_LEASE) { /* try to get lease */
266 if (it->it_flags & FMODE_WRITE)
271 if (it->it_flags & (FMODE_WRITE|MDS_OPEN_TRUNC))
274 else if (it->it_flags & FMODE_EXEC)
280 count = mdc_resource_get_unused(exp, &op_data->op_fid2,
285 /* If CREATE, cancel parent's UPDATE lock. */
286 if (it->it_op & IT_CREAT)
290 count += mdc_resource_get_unused(exp, &op_data->op_fid1,
292 MDS_INODELOCK_UPDATE);
294 req = ptlrpc_request_alloc(class_exp2cliimp(exp),
295 &RQF_LDLM_INTENT_OPEN);
297 ldlm_lock_list_put(&cancels, l_bl_ast, count);
298 RETURN(ERR_PTR(-ENOMEM));
301 req_capsule_set_size(&req->rq_pill, &RMF_NAME, RCL_CLIENT,
302 op_data->op_namelen + 1);
303 req_capsule_set_size(&req->rq_pill, &RMF_EADATA, RCL_CLIENT,
304 max(lmmsize, obddev->u.cli.cl_default_mds_easize));
306 req_capsule_set_size(&req->rq_pill, &RMF_FILE_SECCTX_NAME,
307 RCL_CLIENT, op_data->op_file_secctx_name != NULL ?
308 strlen(op_data->op_file_secctx_name) + 1 : 0);
310 req_capsule_set_size(&req->rq_pill, &RMF_FILE_SECCTX, RCL_CLIENT,
311 op_data->op_file_secctx_size);
313 rc = ldlm_prep_enqueue_req(exp, req, &cancels, count);
315 ptlrpc_request_free(req);
319 spin_lock(&req->rq_lock);
320 req->rq_replay = req->rq_import->imp_replayable;
321 spin_unlock(&req->rq_lock);
323 /* pack the intent */
324 lit = req_capsule_client_get(&req->rq_pill, &RMF_LDLM_INTENT);
325 lit->opc = (__u64)it->it_op;
327 /* pack the intended request */
328 mdc_open_pack(req, op_data, it->it_create_mode, 0, it->it_flags, lmm,
331 req_capsule_set_size(&req->rq_pill, &RMF_MDT_MD, RCL_SERVER,
332 obddev->u.cli.cl_max_mds_easize);
333 req_capsule_set_size(&req->rq_pill, &RMF_ACL, RCL_SERVER,
334 req->rq_import->imp_connect_data.ocd_max_easize);
335 ptlrpc_request_set_replen(req);
339 #define GA_DEFAULT_EA_NAME_LEN 20
340 #define GA_DEFAULT_EA_VAL_LEN 250
341 #define GA_DEFAULT_EA_NUM 10
343 static struct ptlrpc_request *
344 mdc_intent_getxattr_pack(struct obd_export *exp,
345 struct lookup_intent *it,
346 struct md_op_data *op_data)
348 struct ptlrpc_request *req;
349 struct ldlm_intent *lit;
351 struct list_head cancels = LIST_HEAD_INIT(cancels);
355 req = ptlrpc_request_alloc(class_exp2cliimp(exp),
356 &RQF_LDLM_INTENT_GETXATTR);
358 RETURN(ERR_PTR(-ENOMEM));
360 rc = ldlm_prep_enqueue_req(exp, req, &cancels, count);
362 ptlrpc_request_free(req);
366 /* pack the intent */
367 lit = req_capsule_client_get(&req->rq_pill, &RMF_LDLM_INTENT);
368 lit->opc = IT_GETXATTR;
370 /* pack the intended request */
371 mdc_pack_body(req, &op_data->op_fid1, op_data->op_valid,
372 GA_DEFAULT_EA_NAME_LEN * GA_DEFAULT_EA_NUM,
375 req_capsule_set_size(&req->rq_pill, &RMF_EADATA, RCL_SERVER,
376 GA_DEFAULT_EA_NAME_LEN * GA_DEFAULT_EA_NUM);
378 req_capsule_set_size(&req->rq_pill, &RMF_EAVALS, RCL_SERVER,
379 GA_DEFAULT_EA_VAL_LEN * GA_DEFAULT_EA_NUM);
381 req_capsule_set_size(&req->rq_pill, &RMF_EAVALS_LENS, RCL_SERVER,
382 sizeof(__u32) * GA_DEFAULT_EA_NUM);
384 req_capsule_set_size(&req->rq_pill, &RMF_ACL, RCL_SERVER, 0);
386 ptlrpc_request_set_replen(req);
391 static struct ptlrpc_request *mdc_intent_unlink_pack(struct obd_export *exp,
392 struct lookup_intent *it,
393 struct md_op_data *op_data)
395 struct ptlrpc_request *req;
396 struct obd_device *obddev = class_exp2obd(exp);
397 struct ldlm_intent *lit;
401 req = ptlrpc_request_alloc(class_exp2cliimp(exp),
402 &RQF_LDLM_INTENT_UNLINK);
404 RETURN(ERR_PTR(-ENOMEM));
406 req_capsule_set_size(&req->rq_pill, &RMF_NAME, RCL_CLIENT,
407 op_data->op_namelen + 1);
409 rc = ldlm_prep_enqueue_req(exp, req, NULL, 0);
411 ptlrpc_request_free(req);
415 /* pack the intent */
416 lit = req_capsule_client_get(&req->rq_pill, &RMF_LDLM_INTENT);
417 lit->opc = (__u64)it->it_op;
419 /* pack the intended request */
420 mdc_unlink_pack(req, op_data);
422 req_capsule_set_size(&req->rq_pill, &RMF_MDT_MD, RCL_SERVER,
423 obddev->u.cli.cl_default_mds_easize);
424 ptlrpc_request_set_replen(req);
428 static struct ptlrpc_request *mdc_intent_getattr_pack(struct obd_export *exp,
429 struct lookup_intent *it,
430 struct md_op_data *op_data)
432 struct ptlrpc_request *req;
433 struct obd_device *obddev = class_exp2obd(exp);
434 u64 valid = OBD_MD_FLGETATTR | OBD_MD_FLEASIZE |
435 OBD_MD_FLMODEASIZE | OBD_MD_FLDIREA |
436 OBD_MD_MEA | OBD_MD_FLACL;
437 struct ldlm_intent *lit;
442 req = ptlrpc_request_alloc(class_exp2cliimp(exp),
443 &RQF_LDLM_INTENT_GETATTR);
445 RETURN(ERR_PTR(-ENOMEM));
447 req_capsule_set_size(&req->rq_pill, &RMF_NAME, RCL_CLIENT,
448 op_data->op_namelen + 1);
450 rc = ldlm_prep_enqueue_req(exp, req, NULL, 0);
452 ptlrpc_request_free(req);
456 /* pack the intent */
457 lit = req_capsule_client_get(&req->rq_pill, &RMF_LDLM_INTENT);
458 lit->opc = (__u64)it->it_op;
460 if (obddev->u.cli.cl_default_mds_easize > 0)
461 easize = obddev->u.cli.cl_default_mds_easize;
463 easize = obddev->u.cli.cl_max_mds_easize;
465 /* pack the intended request */
466 mdc_getattr_pack(req, valid, it->it_flags, op_data, easize);
468 req_capsule_set_size(&req->rq_pill, &RMF_MDT_MD, RCL_SERVER, easize);
469 req_capsule_set_size(&req->rq_pill, &RMF_ACL, RCL_SERVER,
470 req->rq_import->imp_connect_data.ocd_max_easize);
471 ptlrpc_request_set_replen(req);
475 static struct ptlrpc_request *mdc_intent_layout_pack(struct obd_export *exp,
476 struct lookup_intent *it,
477 struct md_op_data *op_data)
479 struct obd_device *obd = class_exp2obd(exp);
480 struct ptlrpc_request *req;
481 struct ldlm_intent *lit;
482 struct layout_intent *layout;
486 req = ptlrpc_request_alloc(class_exp2cliimp(exp),
487 &RQF_LDLM_INTENT_LAYOUT);
489 RETURN(ERR_PTR(-ENOMEM));
491 req_capsule_set_size(&req->rq_pill, &RMF_EADATA, RCL_CLIENT, 0);
492 rc = ldlm_prep_enqueue_req(exp, req, NULL, 0);
494 ptlrpc_request_free(req);
498 /* pack the intent */
499 lit = req_capsule_client_get(&req->rq_pill, &RMF_LDLM_INTENT);
500 lit->opc = (__u64)it->it_op;
502 /* pack the layout intent request */
503 layout = req_capsule_client_get(&req->rq_pill, &RMF_LAYOUT_INTENT);
504 LASSERT(op_data->op_data != NULL);
505 LASSERT(op_data->op_data_size == sizeof(*layout));
506 memcpy(layout, op_data->op_data, sizeof(*layout));
508 req_capsule_set_size(&req->rq_pill, &RMF_DLM_LVB, RCL_SERVER,
509 obd->u.cli.cl_default_mds_easize);
510 ptlrpc_request_set_replen(req);
514 static struct ptlrpc_request *
515 mdc_enqueue_pack(struct obd_export *exp, int lvb_len)
517 struct ptlrpc_request *req;
521 req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_LDLM_ENQUEUE);
523 RETURN(ERR_PTR(-ENOMEM));
525 rc = ldlm_prep_enqueue_req(exp, req, NULL, 0);
527 ptlrpc_request_free(req);
531 req_capsule_set_size(&req->rq_pill, &RMF_DLM_LVB, RCL_SERVER, lvb_len);
532 ptlrpc_request_set_replen(req);
536 static int mdc_finish_enqueue(struct obd_export *exp,
537 struct ptlrpc_request *req,
538 struct ldlm_enqueue_info *einfo,
539 struct lookup_intent *it,
540 struct lustre_handle *lockh,
543 struct req_capsule *pill = &req->rq_pill;
544 struct ldlm_request *lockreq;
545 struct ldlm_reply *lockrep;
546 struct ldlm_lock *lock;
547 struct mdt_body *body = NULL;
548 void *lvb_data = NULL;
554 /* Similarly, if we're going to replay this request, we don't want to
555 * actually get a lock, just perform the intent. */
556 if (req->rq_transno || req->rq_replay) {
557 lockreq = req_capsule_client_get(pill, &RMF_DLM_REQ);
558 lockreq->lock_flags |= ldlm_flags_to_wire(LDLM_FL_INTENT_ONLY);
561 if (rc == ELDLM_LOCK_ABORTED) {
563 memset(lockh, 0, sizeof(*lockh));
565 } else { /* rc = 0 */
566 lock = ldlm_handle2lock(lockh);
567 LASSERT(lock != NULL);
569 /* If the server gave us back a different lock mode, we should
570 * fix up our variables. */
571 if (lock->l_req_mode != einfo->ei_mode) {
572 ldlm_lock_addref(lockh, lock->l_req_mode);
573 ldlm_lock_decref(lockh, einfo->ei_mode);
574 einfo->ei_mode = lock->l_req_mode;
579 lockrep = req_capsule_server_get(pill, &RMF_DLM_REP);
580 LASSERT(lockrep != NULL); /* checked by ldlm_cli_enqueue() */
582 it->it_disposition = (int)lockrep->lock_policy_res1;
583 it->it_status = (int)lockrep->lock_policy_res2;
584 it->it_lock_mode = einfo->ei_mode;
585 it->it_lock_handle = lockh->cookie;
586 it->it_request = req;
588 /* Technically speaking rq_transno must already be zero if
589 * it_status is in error, so the check is a bit redundant */
590 if ((!req->rq_transno || it->it_status < 0) && req->rq_replay)
591 mdc_clear_replay_flag(req, it->it_status);
593 /* If we're doing an IT_OPEN which did not result in an actual
594 * successful open, then we need to remove the bit which saves
595 * this request for unconditional replay.
597 * It's important that we do this first! Otherwise we might exit the
598 * function without doing so, and try to replay a failed create
600 if (it->it_op & IT_OPEN && req->rq_replay &&
601 (!it_disposition(it, DISP_OPEN_OPEN) || it->it_status != 0))
602 mdc_clear_replay_flag(req, it->it_status);
604 DEBUG_REQ(D_RPCTRACE, req, "op: %d disposition: %x, status: %d",
605 it->it_op, it->it_disposition, it->it_status);
607 /* We know what to expect, so we do any byte flipping required here */
608 if (it_has_reply_body(it)) {
609 body = req_capsule_server_get(pill, &RMF_MDT_BODY);
611 CERROR ("Can't swab mdt_body\n");
615 if (it_disposition(it, DISP_OPEN_OPEN) &&
616 !it_open_error(DISP_OPEN_OPEN, it)) {
618 * If this is a successful OPEN request, we need to set
619 * replay handler and data early, so that if replay
620 * happens immediately after swabbing below, new reply
621 * is swabbed by that handler correctly.
623 mdc_set_open_replay_data(NULL, NULL, it);
626 if (body->mbo_valid & (OBD_MD_FLDIREA | OBD_MD_FLEASIZE)) {
629 mdc_update_max_ea_from_body(exp, body);
632 * The eadata is opaque; just check that it is there.
633 * Eventually, obd_unpackmd() will check the contents.
635 eadata = req_capsule_server_sized_get(pill, &RMF_MDT_MD,
636 body->mbo_eadatasize);
640 /* save lvb data and length in case this is for layout
643 lvb_len = body->mbo_eadatasize;
646 * We save the reply LOV EA in case we have to replay a
647 * create for recovery. If we didn't allocate a large
648 * enough request buffer above we need to reallocate it
649 * here to hold the actual LOV EA.
651 * To not save LOV EA if request is not going to replay
652 * (for example error one).
654 if ((it->it_op & IT_OPEN) && req->rq_replay) {
655 rc = mdc_save_lovea(req, &RMF_EADATA, eadata,
656 body->mbo_eadatasize);
658 body->mbo_valid &= ~OBD_MD_FLEASIZE;
659 body->mbo_eadatasize = 0;
664 } else if (it->it_op & IT_LAYOUT) {
665 /* maybe the lock was granted right away and layout
666 * is packed into RMF_DLM_LVB of req */
667 lvb_len = req_capsule_get_size(pill, &RMF_DLM_LVB, RCL_SERVER);
669 lvb_data = req_capsule_server_sized_get(pill,
670 &RMF_DLM_LVB, lvb_len);
671 if (lvb_data == NULL)
675 * save replied layout data to the request buffer for
676 * recovery consideration (lest MDS reinitialize
677 * another set of OST objects).
680 (void)mdc_save_lovea(req, &RMF_EADATA, lvb_data,
685 /* fill in stripe data for layout lock.
686 * LU-6581: trust layout data only if layout lock is granted. The MDT
687 * has stopped sending layout unless the layout lock is granted. The
688 * client still does this checking in case it's talking with an old
689 * server. - Jinshan */
690 lock = ldlm_handle2lock(lockh);
694 if (ldlm_has_layout(lock) && lvb_data != NULL &&
695 !(lockrep->lock_flags & LDLM_FL_BLOCKED_MASK)) {
698 LDLM_DEBUG(lock, "layout lock returned by: %s, lvb_len: %d",
699 ldlm_it2str(it->it_op), lvb_len);
701 OBD_ALLOC_LARGE(lmm, lvb_len);
703 GOTO(out_lock, rc = -ENOMEM);
705 memcpy(lmm, lvb_data, lvb_len);
707 /* install lvb_data */
708 lock_res_and_lock(lock);
709 if (lock->l_lvb_data == NULL) {
710 lock->l_lvb_type = LVB_T_LAYOUT;
711 lock->l_lvb_data = lmm;
712 lock->l_lvb_len = lvb_len;
715 unlock_res_and_lock(lock);
717 OBD_FREE_LARGE(lmm, lvb_len);
720 if (ldlm_has_dom(lock)) {
721 LASSERT(lock->l_glimpse_ast == mdc_ldlm_glimpse_ast);
723 body = req_capsule_server_get(pill, &RMF_MDT_BODY);
724 if (!(body->mbo_valid & OBD_MD_DOM_SIZE)) {
725 LDLM_ERROR(lock, "%s: DoM lock without size.\n",
726 exp->exp_obd->obd_name);
727 GOTO(out_lock, rc = -EPROTO);
730 LDLM_DEBUG(lock, "DoM lock is returned by: %s, size: %llu",
731 ldlm_it2str(it->it_op), body->mbo_dom_size);
733 rc = mdc_fill_lvb(req, &lock->l_ost_lvb);
741 /* We always reserve enough space in the reply packet for a stripe MD, because
742 * we don't know in advance the file type. */
743 static int mdc_enqueue_base(struct obd_export *exp,
744 struct ldlm_enqueue_info *einfo,
745 const union ldlm_policy_data *policy,
746 struct lookup_intent *it,
747 struct md_op_data *op_data,
748 struct lustre_handle *lockh,
749 __u64 extra_lock_flags)
751 struct obd_device *obddev = class_exp2obd(exp);
752 struct ptlrpc_request *req = NULL;
753 __u64 flags, saved_flags = extra_lock_flags;
754 struct ldlm_res_id res_id;
755 static const union ldlm_policy_data lookup_policy = {
756 .l_inodebits = { MDS_INODELOCK_LOOKUP } };
757 static const union ldlm_policy_data update_policy = {
758 .l_inodebits = { MDS_INODELOCK_UPDATE } };
759 static const union ldlm_policy_data layout_policy = {
760 .l_inodebits = { MDS_INODELOCK_LAYOUT } };
761 static const union ldlm_policy_data getxattr_policy = {
762 .l_inodebits = { MDS_INODELOCK_XATTR } };
763 int generation, resends = 0;
764 struct ldlm_reply *lockrep;
765 enum lvb_type lvb_type = 0;
769 LASSERTF(!it || einfo->ei_type == LDLM_IBITS, "lock type %d\n",
771 fid_build_reg_res_name(&op_data->op_fid1, &res_id);
774 LASSERT(policy == NULL);
776 saved_flags |= LDLM_FL_HAS_INTENT;
777 if (it->it_op & (IT_UNLINK | IT_GETATTR | IT_READDIR))
778 policy = &update_policy;
779 else if (it->it_op & IT_LAYOUT)
780 policy = &layout_policy;
781 else if (it->it_op & (IT_GETXATTR | IT_SETXATTR))
782 policy = &getxattr_policy;
784 policy = &lookup_policy;
787 generation = obddev->u.cli.cl_import->imp_generation;
791 /* The only way right now is FLOCK. */
792 LASSERTF(einfo->ei_type == LDLM_FLOCK, "lock type %d\n",
794 res_id.name[3] = LDLM_FLOCK;
795 } else if (it->it_op & IT_OPEN) {
796 req = mdc_intent_open_pack(exp, it, op_data);
797 } else if (it->it_op & IT_UNLINK) {
798 req = mdc_intent_unlink_pack(exp, it, op_data);
799 } else if (it->it_op & (IT_GETATTR | IT_LOOKUP)) {
800 req = mdc_intent_getattr_pack(exp, it, op_data);
801 } else if (it->it_op & IT_READDIR) {
802 req = mdc_enqueue_pack(exp, 0);
803 } else if (it->it_op & IT_LAYOUT) {
804 if (!imp_connect_lvb_type(class_exp2cliimp(exp)))
806 req = mdc_intent_layout_pack(exp, it, op_data);
807 lvb_type = LVB_T_LAYOUT;
808 } else if (it->it_op & IT_GETXATTR) {
809 req = mdc_intent_getxattr_pack(exp, it, op_data);
816 RETURN(PTR_ERR(req));
819 req->rq_generation_set = 1;
820 req->rq_import_generation = generation;
821 req->rq_sent = ktime_get_real_seconds() + resends;
824 /* It is important to obtain modify RPC slot first (if applicable), so
825 * that threads that are waiting for a modify RPC slot are not polluting
826 * our rpcs in flight counter.
827 * We do not do flock request limiting, though */
829 mdc_get_mod_rpc_slot(req, it);
830 rc = obd_get_request_slot(&obddev->u.cli);
832 mdc_put_mod_rpc_slot(req, it);
833 mdc_clear_replay_flag(req, 0);
834 ptlrpc_req_finished(req);
839 /* With Data-on-MDT the glimpse callback is needed too.
840 * It is set here in advance but not in mdc_finish_enqueue()
841 * to avoid possible races. It is safe to have glimpse handler
842 * for non-DOM locks and costs nothing.*/
843 if (einfo->ei_cb_gl == NULL)
844 einfo->ei_cb_gl = mdc_ldlm_glimpse_ast;
846 rc = ldlm_cli_enqueue(exp, &req, einfo, &res_id, policy, &flags, NULL,
847 0, lvb_type, lockh, 0);
849 /* For flock requests we immediatelly return without further
850 delay and let caller deal with the rest, since rest of
851 this function metadata processing makes no sense for flock
852 requests anyway. But in case of problem during comms with
853 Server (ETIMEDOUT) or any signal/kill attempt (EINTR), we
854 can not rely on caller and this mainly for F_UNLCKs
855 (explicits or automatically generated by Kernel to clean
856 current FLocks upon exit) that can't be trashed */
857 if (((rc == -EINTR) || (rc == -ETIMEDOUT)) &&
858 (einfo->ei_type == LDLM_FLOCK) &&
859 (einfo->ei_mode == LCK_NL))
864 obd_put_request_slot(&obddev->u.cli);
865 mdc_put_mod_rpc_slot(req, it);
868 CDEBUG(D_INFO, "%s: ldlm_cli_enqueue failed: rc = %d\n",
869 obddev->obd_name, rc);
871 mdc_clear_replay_flag(req, rc);
872 ptlrpc_req_finished(req);
876 lockrep = req_capsule_server_get(&req->rq_pill, &RMF_DLM_REP);
877 LASSERT(lockrep != NULL);
879 lockrep->lock_policy_res2 =
880 ptlrpc_status_ntoh(lockrep->lock_policy_res2);
882 /* Retry infinitely when the server returns -EINPROGRESS for the
883 * intent operation, when server returns -EINPROGRESS for acquiring
884 * intent lock, we'll retry in after_reply(). */
885 if (it && (int)lockrep->lock_policy_res2 == -EINPROGRESS) {
886 mdc_clear_replay_flag(req, rc);
887 ptlrpc_req_finished(req);
890 CDEBUG(D_HA, "%s: resend:%d op:%d "DFID"/"DFID"\n",
891 obddev->obd_name, resends, it->it_op,
892 PFID(&op_data->op_fid1), PFID(&op_data->op_fid2));
894 if (generation == obddev->u.cli.cl_import->imp_generation) {
897 CDEBUG(D_HA, "resend cross eviction\n");
902 rc = mdc_finish_enqueue(exp, req, einfo, it, lockh, rc);
904 if (lustre_handle_is_used(lockh)) {
905 ldlm_lock_decref(lockh, einfo->ei_mode);
906 memset(lockh, 0, sizeof(*lockh));
908 ptlrpc_req_finished(req);
910 it->it_lock_handle = 0;
911 it->it_lock_mode = 0;
912 it->it_request = NULL;
918 int mdc_enqueue(struct obd_export *exp, struct ldlm_enqueue_info *einfo,
919 const union ldlm_policy_data *policy,
920 struct md_op_data *op_data,
921 struct lustre_handle *lockh, __u64 extra_lock_flags)
923 return mdc_enqueue_base(exp, einfo, policy, NULL,
924 op_data, lockh, extra_lock_flags);
927 static int mdc_finish_intent_lock(struct obd_export *exp,
928 struct ptlrpc_request *request,
929 struct md_op_data *op_data,
930 struct lookup_intent *it,
931 struct lustre_handle *lockh)
933 struct lustre_handle old_lock;
934 struct ldlm_lock *lock;
938 LASSERT(request != NULL);
939 LASSERT(request != LP_POISON);
940 LASSERT(request->rq_repmsg != LP_POISON);
942 if (it->it_op & IT_READDIR)
945 if (it->it_op & (IT_GETXATTR | IT_LAYOUT)) {
946 if (it->it_status != 0)
947 GOTO(out, rc = it->it_status);
949 if (!it_disposition(it, DISP_IT_EXECD)) {
950 /* The server failed before it even started executing
951 * the intent, i.e. because it couldn't unpack the
954 LASSERT(it->it_status != 0);
955 GOTO(out, rc = it->it_status);
957 rc = it_open_error(DISP_IT_EXECD, it);
961 rc = it_open_error(DISP_LOOKUP_EXECD, it);
965 /* keep requests around for the multiple phases of the call
966 * this shows the DISP_XX must guarantee we make it into the
969 if (!it_disposition(it, DISP_ENQ_CREATE_REF) &&
970 it_disposition(it, DISP_OPEN_CREATE) &&
971 !it_open_error(DISP_OPEN_CREATE, it)) {
972 it_set_disposition(it, DISP_ENQ_CREATE_REF);
973 /* balanced in ll_create_node */
974 ptlrpc_request_addref(request);
976 if (!it_disposition(it, DISP_ENQ_OPEN_REF) &&
977 it_disposition(it, DISP_OPEN_OPEN) &&
978 !it_open_error(DISP_OPEN_OPEN, it)) {
979 it_set_disposition(it, DISP_ENQ_OPEN_REF);
980 /* balanced in ll_file_open */
981 ptlrpc_request_addref(request);
982 /* BUG 11546 - eviction in the middle of open rpc
985 OBD_FAIL_TIMEOUT(OBD_FAIL_MDC_ENQUEUE_PAUSE,
989 if (it->it_op & IT_CREAT) {
990 /* XXX this belongs in ll_create_it */
991 } else if (it->it_op == IT_OPEN) {
992 LASSERT(!it_disposition(it, DISP_OPEN_CREATE));
994 LASSERT(it->it_op & (IT_GETATTR | IT_LOOKUP));
998 /* If we already have a matching lock, then cancel the new
999 * one. We have to set the data here instead of in
1000 * mdc_enqueue, because we need to use the child's inode as
1001 * the l_ast_data to match, and that's not available until
1002 * intent_finish has performed the iget().) */
1003 lock = ldlm_handle2lock(lockh);
1005 union ldlm_policy_data policy = lock->l_policy_data;
1006 LDLM_DEBUG(lock, "matching against this");
1008 if (it_has_reply_body(it)) {
1009 struct mdt_body *body;
1011 body = req_capsule_server_get(&request->rq_pill,
1013 /* mdc_enqueue checked */
1014 LASSERT(body != NULL);
1015 LASSERTF(fid_res_name_eq(&body->mbo_fid1,
1016 &lock->l_resource->lr_name),
1017 "Lock res_id: "DLDLMRES", fid: "DFID"\n",
1018 PLDLMRES(lock->l_resource),
1019 PFID(&body->mbo_fid1));
1021 LDLM_LOCK_PUT(lock);
1023 memcpy(&old_lock, lockh, sizeof(*lockh));
1024 if (ldlm_lock_match(NULL, LDLM_FL_BLOCK_GRANTED, NULL,
1025 LDLM_IBITS, &policy, LCK_NL, &old_lock, 0)) {
1026 ldlm_lock_decref_and_cancel(lockh, it->it_lock_mode);
1027 memcpy(lockh, &old_lock, sizeof(old_lock));
1028 it->it_lock_handle = lockh->cookie;
1034 CDEBUG(D_DENTRY,"D_IT dentry %.*s intent: %s status %d disp %x rc %d\n",
1035 (int)op_data->op_namelen, op_data->op_name,
1036 ldlm_it2str(it->it_op), it->it_status,
1037 it->it_disposition, rc);
1041 int mdc_revalidate_lock(struct obd_export *exp, struct lookup_intent *it,
1042 struct lu_fid *fid, __u64 *bits)
1044 /* We could just return 1 immediately, but since we should only
1045 * be called in revalidate_it if we already have a lock, let's
1047 struct ldlm_res_id res_id;
1048 struct lustre_handle lockh;
1049 union ldlm_policy_data policy;
1050 enum ldlm_mode mode;
1053 if (it->it_lock_handle) {
1054 lockh.cookie = it->it_lock_handle;
1055 mode = ldlm_revalidate_lock_handle(&lockh, bits);
1057 fid_build_reg_res_name(fid, &res_id);
1058 switch (it->it_op) {
1060 /* File attributes are held under multiple bits:
1061 * nlink is under lookup lock, size and times are
1062 * under UPDATE lock and recently we've also got
1063 * a separate permissions lock for owner/group/acl that
1064 * were protected by lookup lock before.
1065 * Getattr must provide all of that information,
1066 * so we need to ensure we have all of those locks.
1067 * Unfortunately, if the bits are split across multiple
1068 * locks, there's no easy way to match all of them here,
1069 * so an extra RPC would be performed to fetch all
1070 * of those bits at once for now. */
1071 /* For new MDTs(> 2.4), UPDATE|PERM should be enough,
1072 * but for old MDTs (< 2.4), permission is covered
1073 * by LOOKUP lock, so it needs to match all bits here.*/
1074 policy.l_inodebits.bits = MDS_INODELOCK_UPDATE |
1075 MDS_INODELOCK_LOOKUP |
1079 policy.l_inodebits.bits = MDS_INODELOCK_UPDATE;
1082 policy.l_inodebits.bits = MDS_INODELOCK_LAYOUT;
1085 policy.l_inodebits.bits = MDS_INODELOCK_LOOKUP;
1089 mode = mdc_lock_match(exp, LDLM_FL_BLOCK_GRANTED, fid,
1090 LDLM_IBITS, &policy,
1091 LCK_CR | LCK_CW | LCK_PR | LCK_PW,
1096 it->it_lock_handle = lockh.cookie;
1097 it->it_lock_mode = mode;
1099 it->it_lock_handle = 0;
1100 it->it_lock_mode = 0;
1107 * This long block is all about fixing up the lock and request state
1108 * so that it is correct as of the moment _before_ the operation was
1109 * applied; that way, the VFS will think that everything is normal and
1110 * call Lustre's regular VFS methods.
1112 * If we're performing a creation, that means that unless the creation
1113 * failed with EEXIST, we should fake up a negative dentry.
1115 * For everything else, we want to lookup to succeed.
1117 * One additional note: if CREATE or OPEN succeeded, we add an extra
1118 * reference to the request because we need to keep it around until
1119 * ll_create/ll_open gets called.
1121 * The server will return to us, in it_disposition, an indication of
1122 * exactly what it_status refers to.
1124 * If DISP_OPEN_OPEN is set, then it_status refers to the open() call,
1125 * otherwise if DISP_OPEN_CREATE is set, then it status is the
1126 * creation failure mode. In either case, one of DISP_LOOKUP_NEG or
1127 * DISP_LOOKUP_POS will be set, indicating whether the child lookup
1130 * Else, if DISP_LOOKUP_EXECD then it_status is the rc of the
1133 int mdc_intent_lock(struct obd_export *exp, struct md_op_data *op_data,
1134 struct lookup_intent *it, struct ptlrpc_request **reqp,
1135 ldlm_blocking_callback cb_blocking, __u64 extra_lock_flags)
1137 struct ldlm_enqueue_info einfo = {
1138 .ei_type = LDLM_IBITS,
1139 .ei_mode = it_to_lock_mode(it),
1140 .ei_cb_bl = cb_blocking,
1141 .ei_cb_cp = ldlm_completion_ast,
1142 .ei_cb_gl = mdc_ldlm_glimpse_ast,
1144 struct lustre_handle lockh;
1149 CDEBUG(D_DLMTRACE, "(name: %.*s,"DFID") in obj "DFID
1150 ", intent: %s flags %#llo\n", (int)op_data->op_namelen,
1151 op_data->op_name, PFID(&op_data->op_fid2),
1152 PFID(&op_data->op_fid1), ldlm_it2str(it->it_op),
1156 if (fid_is_sane(&op_data->op_fid2) &&
1157 (it->it_op & (IT_LOOKUP | IT_GETATTR | IT_READDIR))) {
1158 /* We could just return 1 immediately, but since we should only
1159 * be called in revalidate_it if we already have a lock, let's
1161 it->it_lock_handle = 0;
1162 rc = mdc_revalidate_lock(exp, it, &op_data->op_fid2, NULL);
1163 /* Only return failure if it was not GETATTR by cfid
1164 (from inode_revalidate) */
1165 if (rc || op_data->op_namelen != 0)
1169 /* For case if upper layer did not alloc fid, do it now. */
1170 if (!fid_is_sane(&op_data->op_fid2) && it->it_op & IT_CREAT) {
1171 rc = mdc_fid_alloc(NULL, exp, &op_data->op_fid2, op_data);
1173 CERROR("Can't alloc new fid, rc %d\n", rc);
1178 rc = mdc_enqueue_base(exp, &einfo, NULL, it, op_data, &lockh,
1183 *reqp = it->it_request;
1184 rc = mdc_finish_intent_lock(exp, *reqp, op_data, it, &lockh);
1188 static int mdc_intent_getattr_async_interpret(const struct lu_env *env,
1189 struct ptlrpc_request *req,
1192 struct mdc_getattr_args *ga = args;
1193 struct obd_export *exp = ga->ga_exp;
1194 struct md_enqueue_info *minfo = ga->ga_minfo;
1195 struct ldlm_enqueue_info *einfo = &minfo->mi_einfo;
1196 struct lookup_intent *it;
1197 struct lustre_handle *lockh;
1198 struct obd_device *obddev;
1199 struct ldlm_reply *lockrep;
1200 __u64 flags = LDLM_FL_HAS_INTENT;
1204 lockh = &minfo->mi_lockh;
1206 obddev = class_exp2obd(exp);
1208 obd_put_request_slot(&obddev->u.cli);
1209 if (OBD_FAIL_CHECK(OBD_FAIL_MDC_GETATTR_ENQUEUE))
1212 rc = ldlm_cli_enqueue_fini(exp, req, einfo->ei_type, 1, einfo->ei_mode,
1213 &flags, NULL, 0, lockh, rc);
1215 CERROR("ldlm_cli_enqueue_fini: %d\n", rc);
1216 mdc_clear_replay_flag(req, rc);
1220 lockrep = req_capsule_server_get(&req->rq_pill, &RMF_DLM_REP);
1221 LASSERT(lockrep != NULL);
1223 lockrep->lock_policy_res2 =
1224 ptlrpc_status_ntoh(lockrep->lock_policy_res2);
1226 rc = mdc_finish_enqueue(exp, req, einfo, it, lockh, rc);
1230 rc = mdc_finish_intent_lock(exp, req, &minfo->mi_data, it, lockh);
1234 minfo->mi_cb(req, minfo, rc);
1238 int mdc_intent_getattr_async(struct obd_export *exp,
1239 struct md_enqueue_info *minfo)
1241 struct md_op_data *op_data = &minfo->mi_data;
1242 struct lookup_intent *it = &minfo->mi_it;
1243 struct ptlrpc_request *req;
1244 struct mdc_getattr_args *ga;
1245 struct obd_device *obddev = class_exp2obd(exp);
1246 struct ldlm_res_id res_id;
1247 union ldlm_policy_data policy = {
1248 .l_inodebits = { MDS_INODELOCK_LOOKUP |
1249 MDS_INODELOCK_UPDATE } };
1251 __u64 flags = LDLM_FL_HAS_INTENT;
1254 CDEBUG(D_DLMTRACE, "name: %.*s in inode "DFID", intent: %s flags %#llo\n",
1255 (int)op_data->op_namelen, op_data->op_name,
1256 PFID(&op_data->op_fid1), ldlm_it2str(it->it_op), it->it_flags);
1258 fid_build_reg_res_name(&op_data->op_fid1, &res_id);
1259 req = mdc_intent_getattr_pack(exp, it, op_data);
1261 RETURN(PTR_ERR(req));
1263 rc = obd_get_request_slot(&obddev->u.cli);
1265 ptlrpc_req_finished(req);
1269 /* With Data-on-MDT the glimpse callback is needed too.
1270 * It is set here in advance but not in mdc_finish_enqueue()
1271 * to avoid possible races. It is safe to have glimpse handler
1272 * for non-DOM locks and costs nothing.*/
1273 if (minfo->mi_einfo.ei_cb_gl == NULL)
1274 minfo->mi_einfo.ei_cb_gl = mdc_ldlm_glimpse_ast;
1276 rc = ldlm_cli_enqueue(exp, &req, &minfo->mi_einfo, &res_id, &policy,
1277 &flags, NULL, 0, LVB_T_NONE, &minfo->mi_lockh, 1);
1279 obd_put_request_slot(&obddev->u.cli);
1280 ptlrpc_req_finished(req);
1284 CLASSERT(sizeof(*ga) <= sizeof(req->rq_async_args));
1285 ga = ptlrpc_req_async_args(req);
1287 ga->ga_minfo = minfo;
1289 req->rq_interpret_reply = mdc_intent_getattr_async_interpret;
1290 ptlrpcd_add_req(req);