4 * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
6 * This program is free software; you can redistribute it and/or modify
7 * it under the terms of the GNU General Public License version 2 only,
8 * as published by the Free Software Foundation.
10 * This program is distributed in the hope that it will be useful, but
11 * WITHOUT ANY WARRANTY; without even the implied warranty of
12 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
13 * General Public License version 2 for more details (a copy is included
14 * in the LICENSE file that accompanied this code).
16 * You should have received a copy of the GNU General Public License
17 * version 2 along with this program; If not, see
18 * http://www.sun.com/software/products/lustre/docs/GPLv2.pdf
20 * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara,
21 * CA 95054 USA or visit www.sun.com if you need additional information or
27 * Copyright (c) 2003, 2010, Oracle and/or its affiliates. All rights reserved.
28 * Use is subject to license terms.
30 * Copyright (c) 2011, 2013, Intel Corporation.
33 * This file is part of Lustre, http://www.lustre.org/
34 * Lustre is a trademark of Sun Microsystems, Inc.
37 #define DEBUG_SUBSYSTEM S_MDC
40 # include <linux/module.h>
41 # include <linux/lustre_intent.h>
43 # include <liblustre.h>
47 #include <obd_class.h>
48 #include <lustre_dlm.h>
49 #include <lustre_fid.h> /* fid_res_name_eq() */
50 #include <lustre_mdc.h>
51 #include <lustre_net.h>
52 #include <lustre_req_layout.h>
53 #include "mdc_internal.h"
55 struct mdc_getattr_args {
56 struct obd_export *ga_exp;
57 struct md_enqueue_info *ga_minfo;
58 struct ldlm_enqueue_info *ga_einfo;
61 int it_open_error(int phase, struct lookup_intent *it)
63 if (it_disposition(it, DISP_OPEN_LEASE)) {
64 if (phase >= DISP_OPEN_LEASE)
65 return it->d.lustre.it_status;
69 if (it_disposition(it, DISP_OPEN_OPEN)) {
70 if (phase >= DISP_OPEN_OPEN)
71 return it->d.lustre.it_status;
76 if (it_disposition(it, DISP_OPEN_CREATE)) {
77 if (phase >= DISP_OPEN_CREATE)
78 return it->d.lustre.it_status;
83 if (it_disposition(it, DISP_LOOKUP_EXECD)) {
84 if (phase >= DISP_LOOKUP_EXECD)
85 return it->d.lustre.it_status;
90 if (it_disposition(it, DISP_IT_EXECD)) {
91 if (phase >= DISP_IT_EXECD)
92 return it->d.lustre.it_status;
96 CERROR("it disp: %X, status: %d\n", it->d.lustre.it_disposition,
97 it->d.lustre.it_status);
101 EXPORT_SYMBOL(it_open_error);
103 /* this must be called on a lockh that is known to have a referenced lock */
104 int mdc_set_lock_data(struct obd_export *exp, __u64 *lockh, void *data,
107 struct ldlm_lock *lock;
108 struct inode *new_inode = data;
117 lock = ldlm_handle2lock((struct lustre_handle *)lockh);
119 LASSERT(lock != NULL);
120 lock_res_and_lock(lock);
122 if (lock->l_resource->lr_lvb_inode &&
123 lock->l_resource->lr_lvb_inode != data) {
124 struct inode *old_inode = lock->l_resource->lr_lvb_inode;
125 LASSERTF(old_inode->i_state & I_FREEING,
126 "Found existing inode %p/%lu/%u state %lu in lock: "
127 "setting data to %p/%lu/%u\n", old_inode,
128 old_inode->i_ino, old_inode->i_generation,
130 new_inode, new_inode->i_ino, new_inode->i_generation);
133 lock->l_resource->lr_lvb_inode = new_inode;
135 *bits = lock->l_policy_data.l_inodebits.bits;
137 unlock_res_and_lock(lock);
143 ldlm_mode_t mdc_lock_match(struct obd_export *exp, __u64 flags,
144 const struct lu_fid *fid, ldlm_type_t type,
145 ldlm_policy_data_t *policy, ldlm_mode_t mode,
146 struct lustre_handle *lockh)
148 struct ldlm_res_id res_id;
152 fid_build_reg_res_name(fid, &res_id);
153 /* LU-4405: Clear bits not supported by server */
154 policy->l_inodebits.bits &= exp_connect_ibits(exp);
155 rc = ldlm_lock_match(class_exp2obd(exp)->obd_namespace, flags,
156 &res_id, type, policy, mode, lockh, 0);
160 int mdc_cancel_unused(struct obd_export *exp,
161 const struct lu_fid *fid,
162 ldlm_policy_data_t *policy,
164 ldlm_cancel_flags_t flags,
167 struct ldlm_res_id res_id;
168 struct obd_device *obd = class_exp2obd(exp);
173 fid_build_reg_res_name(fid, &res_id);
174 rc = ldlm_cli_cancel_unused_resource(obd->obd_namespace, &res_id,
175 policy, mode, flags, opaque);
179 int mdc_null_inode(struct obd_export *exp,
180 const struct lu_fid *fid)
182 struct ldlm_res_id res_id;
183 struct ldlm_resource *res;
184 struct ldlm_namespace *ns = class_exp2obd(exp)->obd_namespace;
187 LASSERTF(ns != NULL, "no namespace passed\n");
189 fid_build_reg_res_name(fid, &res_id);
191 res = ldlm_resource_get(ns, NULL, &res_id, 0, 0);
196 res->lr_lvb_inode = NULL;
199 ldlm_resource_putref(res);
203 /* find any ldlm lock of the inode in mdc
207 int mdc_find_cbdata(struct obd_export *exp,
208 const struct lu_fid *fid,
209 ldlm_iterator_t it, void *data)
211 struct ldlm_res_id res_id;
215 fid_build_reg_res_name((struct lu_fid*)fid, &res_id);
216 rc = ldlm_resource_iterate(class_exp2obd(exp)->obd_namespace, &res_id,
218 if (rc == LDLM_ITER_STOP)
220 else if (rc == LDLM_ITER_CONTINUE)
225 static inline void mdc_clear_replay_flag(struct ptlrpc_request *req, int rc)
227 /* Don't hold error requests for replay. */
228 if (req->rq_replay) {
229 spin_lock(&req->rq_lock);
231 spin_unlock(&req->rq_lock);
233 if (rc && req->rq_transno != 0) {
234 DEBUG_REQ(D_ERROR, req, "transno returned on error rc %d", rc);
239 /* Save a large LOV EA into the request buffer so that it is available
240 * for replay. We don't do this in the initial request because the
241 * original request doesn't need this buffer (at most it sends just the
242 * lov_mds_md) and it is a waste of RAM/bandwidth to send the empty
243 * buffer and may also be difficult to allocate and save a very large
244 * request buffer for each open. (bug 5707)
246 * OOM here may cause recovery failure if lmm is needed (only for the
247 * original open if the MDS crashed just when this client also OOM'd)
248 * but this is incredibly unlikely, and questionable whether the client
249 * could do MDS recovery under OOM anyways... */
250 static void mdc_realloc_openmsg(struct ptlrpc_request *req,
251 struct mdt_body *body)
255 /* FIXME: remove this explicit offset. */
256 rc = sptlrpc_cli_enlarge_reqbuf(req, DLM_INTENT_REC_OFF + 4,
259 CERROR("Can't enlarge segment %d size to %d\n",
260 DLM_INTENT_REC_OFF + 4, body->eadatasize);
261 body->valid &= ~OBD_MD_FLEASIZE;
262 body->eadatasize = 0;
266 static struct ptlrpc_request *
267 mdc_intent_open_pack(struct obd_export *exp, struct lookup_intent *it,
268 struct md_op_data *op_data)
270 struct ptlrpc_request *req;
271 struct obd_device *obddev = class_exp2obd(exp);
272 struct ldlm_intent *lit;
273 const void *lmm = op_data->op_data;
274 int lmmsize = op_data->op_data_size;
275 struct list_head cancels;
281 INIT_LIST_HEAD(&cancels);
283 it->it_create_mode = (it->it_create_mode & ~S_IFMT) | S_IFREG;
285 /* XXX: openlock is not cancelled for cross-refs. */
286 /* If inode is known, cancel conflicting OPEN locks. */
287 if (fid_is_sane(&op_data->op_fid2)) {
288 if (it->it_flags & MDS_OPEN_LEASE) { /* try to get lease */
289 if (it->it_flags & FMODE_WRITE)
294 if (it->it_flags & (FMODE_WRITE|MDS_OPEN_TRUNC))
297 else if (it->it_flags & FMODE_EXEC)
303 count = mdc_resource_get_unused(exp, &op_data->op_fid2,
308 /* If CREATE, cancel parent's UPDATE lock. */
309 if (it->it_op & IT_CREAT)
313 count += mdc_resource_get_unused(exp, &op_data->op_fid1,
315 MDS_INODELOCK_UPDATE);
317 req = ptlrpc_request_alloc(class_exp2cliimp(exp),
318 &RQF_LDLM_INTENT_OPEN);
320 ldlm_lock_list_put(&cancels, l_bl_ast, count);
321 RETURN(ERR_PTR(-ENOMEM));
324 /* parent capability */
325 mdc_set_capa_size(req, &RMF_CAPA1, op_data->op_capa1);
326 /* child capability, reserve the size according to parent capa, it will
327 * be filled after we get the reply */
328 mdc_set_capa_size(req, &RMF_CAPA2, op_data->op_capa1);
330 req_capsule_set_size(&req->rq_pill, &RMF_NAME, RCL_CLIENT,
331 op_data->op_namelen + 1);
332 req_capsule_set_size(&req->rq_pill, &RMF_EADATA, RCL_CLIENT,
333 max(lmmsize, obddev->u.cli.cl_default_mds_easize));
335 rc = ldlm_prep_enqueue_req(exp, req, &cancels, count);
337 ptlrpc_request_free(req);
341 spin_lock(&req->rq_lock);
342 req->rq_replay = req->rq_import->imp_replayable;
343 spin_unlock(&req->rq_lock);
345 /* pack the intent */
346 lit = req_capsule_client_get(&req->rq_pill, &RMF_LDLM_INTENT);
347 lit->opc = (__u64)it->it_op;
349 /* pack the intended request */
350 mdc_open_pack(req, op_data, it->it_create_mode, 0, it->it_flags, lmm,
353 req_capsule_set_size(&req->rq_pill, &RMF_MDT_MD, RCL_SERVER,
354 obddev->u.cli.cl_max_mds_easize);
356 /* for remote client, fetch remote perm for current user */
357 if (client_is_remote(exp))
358 req_capsule_set_size(&req->rq_pill, &RMF_ACL, RCL_SERVER,
359 sizeof(struct mdt_remote_perm));
360 ptlrpc_request_set_replen(req);
364 static struct ptlrpc_request *
365 mdc_intent_getxattr_pack(struct obd_export *exp,
366 struct lookup_intent *it,
367 struct md_op_data *op_data)
369 struct ptlrpc_request *req;
370 struct ldlm_intent *lit;
371 int rc, count = 0, maxdata;
372 CFS_LIST_HEAD(cancels);
376 req = ptlrpc_request_alloc(class_exp2cliimp(exp),
377 &RQF_LDLM_INTENT_GETXATTR);
379 RETURN(ERR_PTR(-ENOMEM));
381 mdc_set_capa_size(req, &RMF_CAPA1, op_data->op_capa1);
383 rc = ldlm_prep_enqueue_req(exp, req, &cancels, count);
385 ptlrpc_request_free(req);
389 /* pack the intent */
390 lit = req_capsule_client_get(&req->rq_pill, &RMF_LDLM_INTENT);
391 lit->opc = IT_GETXATTR;
393 maxdata = class_exp2cliimp(exp)->imp_connect_data.ocd_max_easize;
395 /* pack the intended request */
396 mdc_pack_body(req, &op_data->op_fid1, op_data->op_capa1,
397 op_data->op_valid, maxdata, -1, 0);
399 req_capsule_set_size(&req->rq_pill, &RMF_EADATA,
400 RCL_SERVER, maxdata);
402 req_capsule_set_size(&req->rq_pill, &RMF_EAVALS,
403 RCL_SERVER, maxdata);
405 req_capsule_set_size(&req->rq_pill, &RMF_EAVALS_LENS,
406 RCL_SERVER, maxdata);
408 ptlrpc_request_set_replen(req);
413 static struct ptlrpc_request *mdc_intent_unlink_pack(struct obd_export *exp,
414 struct lookup_intent *it,
415 struct md_op_data *op_data)
417 struct ptlrpc_request *req;
418 struct obd_device *obddev = class_exp2obd(exp);
419 struct ldlm_intent *lit;
423 req = ptlrpc_request_alloc(class_exp2cliimp(exp),
424 &RQF_LDLM_INTENT_UNLINK);
426 RETURN(ERR_PTR(-ENOMEM));
428 mdc_set_capa_size(req, &RMF_CAPA1, op_data->op_capa1);
429 req_capsule_set_size(&req->rq_pill, &RMF_NAME, RCL_CLIENT,
430 op_data->op_namelen + 1);
432 rc = ldlm_prep_enqueue_req(exp, req, NULL, 0);
434 ptlrpc_request_free(req);
438 /* pack the intent */
439 lit = req_capsule_client_get(&req->rq_pill, &RMF_LDLM_INTENT);
440 lit->opc = (__u64)it->it_op;
442 /* pack the intended request */
443 mdc_unlink_pack(req, op_data);
445 req_capsule_set_size(&req->rq_pill, &RMF_MDT_MD, RCL_SERVER,
446 obddev->u.cli.cl_default_mds_easize);
447 req_capsule_set_size(&req->rq_pill, &RMF_ACL, RCL_SERVER,
448 obddev->u.cli.cl_default_mds_cookiesize);
449 ptlrpc_request_set_replen(req);
453 static struct ptlrpc_request *mdc_intent_getattr_pack(struct obd_export *exp,
454 struct lookup_intent *it,
455 struct md_op_data *op_data)
457 struct ptlrpc_request *req;
458 struct obd_device *obddev = class_exp2obd(exp);
459 obd_valid valid = OBD_MD_FLGETATTR | OBD_MD_FLEASIZE |
460 OBD_MD_FLMODEASIZE | OBD_MD_FLDIREA |
461 OBD_MD_FLMDSCAPA | OBD_MD_MEA |
462 (client_is_remote(exp) ?
463 OBD_MD_FLRMTPERM : OBD_MD_FLACL);
464 struct ldlm_intent *lit;
469 req = ptlrpc_request_alloc(class_exp2cliimp(exp),
470 &RQF_LDLM_INTENT_GETATTR);
472 RETURN(ERR_PTR(-ENOMEM));
474 mdc_set_capa_size(req, &RMF_CAPA1, op_data->op_capa1);
475 req_capsule_set_size(&req->rq_pill, &RMF_NAME, RCL_CLIENT,
476 op_data->op_namelen + 1);
478 rc = ldlm_prep_enqueue_req(exp, req, NULL, 0);
480 ptlrpc_request_free(req);
484 /* pack the intent */
485 lit = req_capsule_client_get(&req->rq_pill, &RMF_LDLM_INTENT);
486 lit->opc = (__u64)it->it_op;
488 if (obddev->u.cli.cl_default_mds_easize > 0)
489 easize = obddev->u.cli.cl_default_mds_easize;
491 easize = obddev->u.cli.cl_max_mds_easize;
493 /* pack the intended request */
494 mdc_getattr_pack(req, valid, it->it_flags, op_data, easize);
496 req_capsule_set_size(&req->rq_pill, &RMF_MDT_MD, RCL_SERVER, easize);
497 if (client_is_remote(exp))
498 req_capsule_set_size(&req->rq_pill, &RMF_ACL, RCL_SERVER,
499 sizeof(struct mdt_remote_perm));
500 ptlrpc_request_set_replen(req);
504 static struct ptlrpc_request *mdc_intent_layout_pack(struct obd_export *exp,
505 struct lookup_intent *it,
506 struct md_op_data *unused)
508 struct obd_device *obd = class_exp2obd(exp);
509 struct ptlrpc_request *req;
510 struct ldlm_intent *lit;
511 struct layout_intent *layout;
515 req = ptlrpc_request_alloc(class_exp2cliimp(exp),
516 &RQF_LDLM_INTENT_LAYOUT);
518 RETURN(ERR_PTR(-ENOMEM));
520 req_capsule_set_size(&req->rq_pill, &RMF_EADATA, RCL_CLIENT, 0);
521 rc = ldlm_prep_enqueue_req(exp, req, NULL, 0);
523 ptlrpc_request_free(req);
527 /* pack the intent */
528 lit = req_capsule_client_get(&req->rq_pill, &RMF_LDLM_INTENT);
529 lit->opc = (__u64)it->it_op;
531 /* pack the layout intent request */
532 layout = req_capsule_client_get(&req->rq_pill, &RMF_LAYOUT_INTENT);
533 /* LAYOUT_INTENT_ACCESS is generic, specific operation will be
534 * set for replication */
535 layout->li_opc = LAYOUT_INTENT_ACCESS;
537 req_capsule_set_size(&req->rq_pill, &RMF_DLM_LVB, RCL_SERVER,
538 obd->u.cli.cl_default_mds_easize);
539 ptlrpc_request_set_replen(req);
543 static struct ptlrpc_request *
544 mdc_enqueue_pack(struct obd_export *exp, int lvb_len)
546 struct ptlrpc_request *req;
550 req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_LDLM_ENQUEUE);
552 RETURN(ERR_PTR(-ENOMEM));
554 rc = ldlm_prep_enqueue_req(exp, req, NULL, 0);
556 ptlrpc_request_free(req);
560 req_capsule_set_size(&req->rq_pill, &RMF_DLM_LVB, RCL_SERVER, lvb_len);
561 ptlrpc_request_set_replen(req);
565 static int mdc_finish_enqueue(struct obd_export *exp,
566 struct ptlrpc_request *req,
567 struct ldlm_enqueue_info *einfo,
568 struct lookup_intent *it,
569 struct lustre_handle *lockh,
572 struct req_capsule *pill = &req->rq_pill;
573 struct ldlm_request *lockreq;
574 struct ldlm_reply *lockrep;
575 struct lustre_intent_data *intent = &it->d.lustre;
576 struct ldlm_lock *lock;
577 void *lvb_data = NULL;
582 /* Similarly, if we're going to replay this request, we don't want to
583 * actually get a lock, just perform the intent. */
584 if (req->rq_transno || req->rq_replay) {
585 lockreq = req_capsule_client_get(pill, &RMF_DLM_REQ);
586 lockreq->lock_flags |= ldlm_flags_to_wire(LDLM_FL_INTENT_ONLY);
589 if (rc == ELDLM_LOCK_ABORTED) {
591 memset(lockh, 0, sizeof(*lockh));
593 } else { /* rc = 0 */
594 lock = ldlm_handle2lock(lockh);
595 LASSERT(lock != NULL);
597 /* If the server gave us back a different lock mode, we should
598 * fix up our variables. */
599 if (lock->l_req_mode != einfo->ei_mode) {
600 ldlm_lock_addref(lockh, lock->l_req_mode);
601 ldlm_lock_decref(lockh, einfo->ei_mode);
602 einfo->ei_mode = lock->l_req_mode;
607 lockrep = req_capsule_server_get(pill, &RMF_DLM_REP);
608 LASSERT(lockrep != NULL); /* checked by ldlm_cli_enqueue() */
610 intent->it_disposition = (int)lockrep->lock_policy_res1;
611 intent->it_status = (int)lockrep->lock_policy_res2;
612 intent->it_lock_mode = einfo->ei_mode;
613 intent->it_lock_handle = lockh->cookie;
614 intent->it_data = req;
616 /* Technically speaking rq_transno must already be zero if
617 * it_status is in error, so the check is a bit redundant */
618 if ((!req->rq_transno || intent->it_status < 0) && req->rq_replay)
619 mdc_clear_replay_flag(req, intent->it_status);
621 /* If we're doing an IT_OPEN which did not result in an actual
622 * successful open, then we need to remove the bit which saves
623 * this request for unconditional replay.
625 * It's important that we do this first! Otherwise we might exit the
626 * function without doing so, and try to replay a failed create
628 if (it->it_op & IT_OPEN && req->rq_replay &&
629 (!it_disposition(it, DISP_OPEN_OPEN) ||intent->it_status != 0))
630 mdc_clear_replay_flag(req, intent->it_status);
632 DEBUG_REQ(D_RPCTRACE, req, "op: %d disposition: %x, status: %d",
633 it->it_op, intent->it_disposition, intent->it_status);
635 /* We know what to expect, so we do any byte flipping required here */
636 if (it->it_op & (IT_OPEN | IT_UNLINK | IT_LOOKUP | IT_GETATTR)) {
637 struct mdt_body *body;
639 body = req_capsule_server_get(pill, &RMF_MDT_BODY);
641 CERROR ("Can't swab mdt_body\n");
645 if (it_disposition(it, DISP_OPEN_OPEN) &&
646 !it_open_error(DISP_OPEN_OPEN, it)) {
648 * If this is a successful OPEN request, we need to set
649 * replay handler and data early, so that if replay
650 * happens immediately after swabbing below, new reply
651 * is swabbed by that handler correctly.
653 mdc_set_open_replay_data(NULL, NULL, it);
656 if ((body->valid & (OBD_MD_FLDIREA | OBD_MD_FLEASIZE)) != 0) {
659 mdc_update_max_ea_from_body(exp, body);
662 * The eadata is opaque; just check that it is there.
663 * Eventually, obd_unpackmd() will check the contents.
665 eadata = req_capsule_server_sized_get(pill, &RMF_MDT_MD,
670 /* save lvb data and length in case this is for layout
673 lvb_len = body->eadatasize;
676 * We save the reply LOV EA in case we have to replay a
677 * create for recovery. If we didn't allocate a large
678 * enough request buffer above we need to reallocate it
679 * here to hold the actual LOV EA.
681 * To not save LOV EA if request is not going to replay
682 * (for example error one).
684 if ((it->it_op & IT_OPEN) && req->rq_replay) {
686 if (req_capsule_get_size(pill, &RMF_EADATA,
689 mdc_realloc_openmsg(req, body);
691 req_capsule_shrink(pill, &RMF_EADATA,
695 req_capsule_set_size(pill, &RMF_EADATA,
699 lmm = req_capsule_client_get(pill, &RMF_EADATA);
701 memcpy(lmm, eadata, body->eadatasize);
705 if (body->valid & OBD_MD_FLRMTPERM) {
706 struct mdt_remote_perm *perm;
708 LASSERT(client_is_remote(exp));
709 perm = req_capsule_server_swab_get(pill, &RMF_ACL,
710 lustre_swab_mdt_remote_perm);
714 if (body->valid & OBD_MD_FLMDSCAPA) {
715 struct lustre_capa *capa, *p;
717 capa = req_capsule_server_get(pill, &RMF_CAPA1);
721 if (it->it_op & IT_OPEN) {
722 /* client fid capa will be checked in replay */
723 p = req_capsule_client_get(pill, &RMF_CAPA2);
728 if (body->valid & OBD_MD_FLOSSCAPA) {
729 struct lustre_capa *capa;
731 capa = req_capsule_server_get(pill, &RMF_CAPA2);
735 } else if (it->it_op & IT_LAYOUT) {
736 /* maybe the lock was granted right away and layout
737 * is packed into RMF_DLM_LVB of req */
738 lvb_len = req_capsule_get_size(pill, &RMF_DLM_LVB, RCL_SERVER);
740 lvb_data = req_capsule_server_sized_get(pill,
741 &RMF_DLM_LVB, lvb_len);
742 if (lvb_data == NULL)
747 /* fill in stripe data for layout lock */
748 lock = ldlm_handle2lock(lockh);
749 if (lock != NULL && ldlm_has_layout(lock) && lvb_data != NULL) {
752 LDLM_DEBUG(lock, "layout lock returned by: %s, lvb_len: %d\n",
753 ldlm_it2str(it->it_op), lvb_len);
755 OBD_ALLOC_LARGE(lmm, lvb_len);
760 memcpy(lmm, lvb_data, lvb_len);
762 /* install lvb_data */
763 lock_res_and_lock(lock);
764 if (lock->l_lvb_data == NULL) {
765 lock->l_lvb_type = LVB_T_LAYOUT;
766 lock->l_lvb_data = lmm;
767 lock->l_lvb_len = lvb_len;
770 unlock_res_and_lock(lock);
772 OBD_FREE_LARGE(lmm, lvb_len);
780 /* We always reserve enough space in the reply packet for a stripe MD, because
781 * we don't know in advance the file type. */
782 int mdc_enqueue(struct obd_export *exp,
783 struct ldlm_enqueue_info *einfo,
784 const union ldlm_policy_data *policy,
785 struct lookup_intent *it, struct md_op_data *op_data,
786 struct lustre_handle *lockh, __u64 extra_lock_flags)
788 struct obd_device *obddev = class_exp2obd(exp);
789 struct ptlrpc_request *req = NULL;
790 __u64 flags, saved_flags = extra_lock_flags;
792 struct ldlm_res_id res_id;
793 static const ldlm_policy_data_t lookup_policy =
794 { .l_inodebits = { MDS_INODELOCK_LOOKUP } };
795 static const ldlm_policy_data_t update_policy =
796 { .l_inodebits = { MDS_INODELOCK_UPDATE } };
797 static const ldlm_policy_data_t layout_policy =
798 { .l_inodebits = { MDS_INODELOCK_LAYOUT } };
799 static const ldlm_policy_data_t getxattr_policy = {
800 .l_inodebits = { MDS_INODELOCK_XATTR } };
801 int generation, resends = 0;
802 struct ldlm_reply *lockrep;
803 enum lvb_type lvb_type = 0;
806 LASSERTF(!it || einfo->ei_type == LDLM_IBITS, "lock type %d\n",
808 fid_build_reg_res_name(&op_data->op_fid1, &res_id);
811 LASSERT(policy == NULL);
813 saved_flags |= LDLM_FL_HAS_INTENT;
814 if (it->it_op & (IT_OPEN | IT_UNLINK | IT_GETATTR | IT_READDIR))
815 policy = &update_policy;
816 else if (it->it_op & IT_LAYOUT)
817 policy = &layout_policy;
818 else if (it->it_op & (IT_GETXATTR | IT_SETXATTR))
819 policy = &getxattr_policy;
821 policy = &lookup_policy;
824 generation = obddev->u.cli.cl_import->imp_generation;
828 /* The only way right now is FLOCK. */
829 LASSERTF(einfo->ei_type == LDLM_FLOCK, "lock type %d\n",
831 res_id.name[3] = LDLM_FLOCK;
832 } else if (it->it_op & IT_OPEN) {
833 LASSERT(einfo->ei_cbdata == NULL);
834 req = mdc_intent_open_pack(exp, it, op_data);
835 } else if (it->it_op & IT_UNLINK) {
836 req = mdc_intent_unlink_pack(exp, it, op_data);
837 } else if (it->it_op & (IT_GETATTR | IT_LOOKUP)) {
838 req = mdc_intent_getattr_pack(exp, it, op_data);
839 } else if (it->it_op & IT_READDIR) {
840 req = mdc_enqueue_pack(exp, 0);
841 } else if (it->it_op & IT_LAYOUT) {
842 if (!imp_connect_lvb_type(class_exp2cliimp(exp)))
844 req = mdc_intent_layout_pack(exp, it, op_data);
845 lvb_type = LVB_T_LAYOUT;
846 } else if (it->it_op & IT_GETXATTR) {
847 req = mdc_intent_getxattr_pack(exp, it, op_data);
854 RETURN(PTR_ERR(req));
856 if (req != NULL && it && it->it_op & IT_CREAT)
857 /* ask ptlrpc not to resend on EINPROGRESS since we have our own
859 req->rq_no_retry_einprogress = 1;
862 req->rq_generation_set = 1;
863 req->rq_import_generation = generation;
864 req->rq_sent = cfs_time_current_sec() + resends;
867 /* It is important to obtain rpc_lock first (if applicable), so that
868 * threads that are serialised with rpc_lock are not polluting our
869 * rpcs in flight counter. We do not do flock request limiting, though*/
871 mdc_get_rpc_lock(obddev->u.cli.cl_rpc_lock, it);
872 rc = obd_get_request_slot(&obddev->u.cli);
874 mdc_put_rpc_lock(obddev->u.cli.cl_rpc_lock, it);
875 mdc_clear_replay_flag(req, 0);
876 ptlrpc_req_finished(req);
881 rc = ldlm_cli_enqueue(exp, &req, einfo, &res_id, policy, &flags, NULL,
882 0, lvb_type, lockh, 0);
884 /* For flock requests we immediatelly return without further
885 delay and let caller deal with the rest, since rest of
886 this function metadata processing makes no sense for flock
887 requests anyway. But in case of problem during comms with
888 Server (ETIMEDOUT) or any signal/kill attempt (EINTR), we
889 can not rely on caller and this mainly for F_UNLCKs
890 (explicits or automatically generated by Kernel to clean
891 current FLocks upon exit) that can't be trashed */
892 if (((rc == -EINTR) || (rc == -ETIMEDOUT)) &&
893 (einfo->ei_type == LDLM_FLOCK) &&
894 (einfo->ei_mode == LCK_NL))
899 obd_put_request_slot(&obddev->u.cli);
900 mdc_put_rpc_lock(obddev->u.cli.cl_rpc_lock, it);
903 CDEBUG_LIMIT((rc == -EACCES || rc == -EIDRM) ? D_INFO : D_ERROR,
904 "%s: ldlm_cli_enqueue failed: rc = %d\n",
905 obddev->obd_name, rc);
907 mdc_clear_replay_flag(req, rc);
908 ptlrpc_req_finished(req);
912 lockrep = req_capsule_server_get(&req->rq_pill, &RMF_DLM_REP);
913 LASSERT(lockrep != NULL);
915 lockrep->lock_policy_res2 =
916 ptlrpc_status_ntoh(lockrep->lock_policy_res2);
918 /* Retry the create infinitely when we get -EINPROGRESS from
919 * server. This is required by the new quota design. */
920 if (it && it->it_op & IT_CREAT &&
921 (int)lockrep->lock_policy_res2 == -EINPROGRESS) {
922 mdc_clear_replay_flag(req, rc);
923 ptlrpc_req_finished(req);
926 CDEBUG(D_HA, "%s: resend:%d op:%d "DFID"/"DFID"\n",
927 obddev->obd_name, resends, it->it_op,
928 PFID(&op_data->op_fid1), PFID(&op_data->op_fid2));
930 if (generation == obddev->u.cli.cl_import->imp_generation) {
933 CDEBUG(D_HA, "resend cross eviction\n");
938 rc = mdc_finish_enqueue(exp, req, einfo, it, lockh, rc);
940 if (lustre_handle_is_used(lockh)) {
941 ldlm_lock_decref(lockh, einfo->ei_mode);
942 memset(lockh, 0, sizeof(*lockh));
944 ptlrpc_req_finished(req);
949 static int mdc_finish_intent_lock(struct obd_export *exp,
950 struct ptlrpc_request *request,
951 struct md_op_data *op_data,
952 struct lookup_intent *it,
953 struct lustre_handle *lockh)
955 struct lustre_handle old_lock;
956 struct mdt_body *mdt_body;
957 struct ldlm_lock *lock;
961 LASSERT(request != NULL);
962 LASSERT(request != LP_POISON);
963 LASSERT(request->rq_repmsg != LP_POISON);
965 if (it->it_op & IT_READDIR)
968 if (!it_disposition(it, DISP_IT_EXECD)) {
969 /* The server failed before it even started executing the
970 * intent, i.e. because it couldn't unpack the request. */
971 LASSERT(it->d.lustre.it_status != 0);
972 RETURN(it->d.lustre.it_status);
974 rc = it_open_error(DISP_IT_EXECD, it);
978 mdt_body = req_capsule_server_get(&request->rq_pill, &RMF_MDT_BODY);
979 LASSERT(mdt_body != NULL); /* mdc_enqueue checked */
981 /* If we were revalidating a fid/name pair, mark the intent in
982 * case we fail and get called again from lookup */
983 if (fid_is_sane(&op_data->op_fid2) &&
984 it->it_create_mode & M_CHECK_STALE &&
985 it->it_op != IT_GETATTR) {
986 /* Also: did we find the same inode? */
987 /* sever can return one of two fids:
988 * op_fid2 - new allocated fid - if file is created.
989 * op_fid3 - existent fid - if file only open.
990 * op_fid3 is saved in lmv_intent_open */
991 if ((!lu_fid_eq(&op_data->op_fid2, &mdt_body->fid1)) &&
992 (!lu_fid_eq(&op_data->op_fid3, &mdt_body->fid1))) {
993 CDEBUG(D_DENTRY, "Found stale data "DFID"("DFID")/"DFID
994 "\n", PFID(&op_data->op_fid2),
995 PFID(&op_data->op_fid2), PFID(&mdt_body->fid1));
1000 rc = it_open_error(DISP_LOOKUP_EXECD, it);
1004 /* keep requests around for the multiple phases of the call
1005 * this shows the DISP_XX must guarantee we make it into the call
1007 if (!it_disposition(it, DISP_ENQ_CREATE_REF) &&
1008 it_disposition(it, DISP_OPEN_CREATE) &&
1009 !it_open_error(DISP_OPEN_CREATE, it)) {
1010 it_set_disposition(it, DISP_ENQ_CREATE_REF);
1011 ptlrpc_request_addref(request); /* balanced in ll_create_node */
1013 if (!it_disposition(it, DISP_ENQ_OPEN_REF) &&
1014 it_disposition(it, DISP_OPEN_OPEN) &&
1015 !it_open_error(DISP_OPEN_OPEN, it)) {
1016 it_set_disposition(it, DISP_ENQ_OPEN_REF);
1017 ptlrpc_request_addref(request); /* balanced in ll_file_open */
1018 /* BUG 11546 - eviction in the middle of open rpc processing */
1019 OBD_FAIL_TIMEOUT(OBD_FAIL_MDC_ENQUEUE_PAUSE, obd_timeout);
1022 if (it->it_op & IT_CREAT) {
1023 /* XXX this belongs in ll_create_it */
1024 } else if (it->it_op == IT_OPEN) {
1025 LASSERT(!it_disposition(it, DISP_OPEN_CREATE));
1027 LASSERT(it->it_op & (IT_GETATTR | IT_LOOKUP | IT_LAYOUT));
1030 /* If we already have a matching lock, then cancel the new
1031 * one. We have to set the data here instead of in
1032 * mdc_enqueue, because we need to use the child's inode as
1033 * the l_ast_data to match, and that's not available until
1034 * intent_finish has performed the iget().) */
1035 lock = ldlm_handle2lock(lockh);
1037 ldlm_policy_data_t policy = lock->l_policy_data;
1038 LDLM_DEBUG(lock, "matching against this");
1040 LASSERTF(fid_res_name_eq(&mdt_body->fid1,
1041 &lock->l_resource->lr_name),
1042 "Lock res_id: "DLDLMRES", fid: "DFID"\n",
1043 PLDLMRES(lock->l_resource), PFID(&mdt_body->fid1));
1044 LDLM_LOCK_PUT(lock);
1046 memcpy(&old_lock, lockh, sizeof(*lockh));
1047 if (ldlm_lock_match(NULL, LDLM_FL_BLOCK_GRANTED, NULL,
1048 LDLM_IBITS, &policy, LCK_NL, &old_lock, 0)) {
1049 ldlm_lock_decref_and_cancel(lockh,
1050 it->d.lustre.it_lock_mode);
1051 memcpy(lockh, &old_lock, sizeof(old_lock));
1052 it->d.lustre.it_lock_handle = lockh->cookie;
1055 CDEBUG(D_DENTRY,"D_IT dentry %.*s intent: %s status %d disp %x rc %d\n",
1056 op_data->op_namelen, op_data->op_name, ldlm_it2str(it->it_op),
1057 it->d.lustre.it_status, it->d.lustre.it_disposition, rc);
1061 int mdc_revalidate_lock(struct obd_export *exp, struct lookup_intent *it,
1062 struct lu_fid *fid, __u64 *bits)
1064 /* We could just return 1 immediately, but since we should only
1065 * be called in revalidate_it if we already have a lock, let's
1067 struct ldlm_res_id res_id;
1068 struct lustre_handle lockh;
1069 ldlm_policy_data_t policy;
1073 if (it->d.lustre.it_lock_handle) {
1074 lockh.cookie = it->d.lustre.it_lock_handle;
1075 mode = ldlm_revalidate_lock_handle(&lockh, bits);
1077 fid_build_reg_res_name(fid, &res_id);
1078 switch (it->it_op) {
1080 /* File attributes are held under multiple bits:
1081 * nlink is under lookup lock, size and times are
1082 * under UPDATE lock and recently we've also got
1083 * a separate permissions lock for owner/group/acl that
1084 * were protected by lookup lock before.
1085 * Getattr must provide all of that information,
1086 * so we need to ensure we have all of those locks.
1087 * Unfortunately, if the bits are split across multiple
1088 * locks, there's no easy way to match all of them here,
1089 * so an extra RPC would be performed to fetch all
1090 * of those bits at once for now. */
1091 /* For new MDTs(> 2.4), UPDATE|PERM should be enough,
1092 * but for old MDTs (< 2.4), permission is covered
1093 * by LOOKUP lock, so it needs to match all bits here.*/
1094 policy.l_inodebits.bits = MDS_INODELOCK_UPDATE |
1095 MDS_INODELOCK_LOOKUP |
1099 policy.l_inodebits.bits = MDS_INODELOCK_UPDATE;
1102 policy.l_inodebits.bits = MDS_INODELOCK_LAYOUT;
1105 policy.l_inodebits.bits = MDS_INODELOCK_LOOKUP;
1109 mode = mdc_lock_match(exp, LDLM_FL_BLOCK_GRANTED, fid,
1110 LDLM_IBITS, &policy,
1111 LCK_CR | LCK_CW | LCK_PR | LCK_PW,
1116 it->d.lustre.it_lock_handle = lockh.cookie;
1117 it->d.lustre.it_lock_mode = mode;
1119 it->d.lustre.it_lock_handle = 0;
1120 it->d.lustre.it_lock_mode = 0;
1127 * This long block is all about fixing up the lock and request state
1128 * so that it is correct as of the moment _before_ the operation was
1129 * applied; that way, the VFS will think that everything is normal and
1130 * call Lustre's regular VFS methods.
1132 * If we're performing a creation, that means that unless the creation
1133 * failed with EEXIST, we should fake up a negative dentry.
1135 * For everything else, we want to lookup to succeed.
1137 * One additional note: if CREATE or OPEN succeeded, we add an extra
1138 * reference to the request because we need to keep it around until
1139 * ll_create/ll_open gets called.
1141 * The server will return to us, in it_disposition, an indication of
1142 * exactly what d.lustre.it_status refers to.
1144 * If DISP_OPEN_OPEN is set, then d.lustre.it_status refers to the open() call,
1145 * otherwise if DISP_OPEN_CREATE is set, then it status is the
1146 * creation failure mode. In either case, one of DISP_LOOKUP_NEG or
1147 * DISP_LOOKUP_POS will be set, indicating whether the child lookup
1150 * Else, if DISP_LOOKUP_EXECD then d.lustre.it_status is the rc of the
1153 int mdc_intent_lock(struct obd_export *exp, struct md_op_data *op_data,
1154 struct lookup_intent *it, struct ptlrpc_request **reqp,
1155 ldlm_blocking_callback cb_blocking, __u64 extra_lock_flags)
1157 struct ldlm_enqueue_info einfo = {
1158 .ei_type = LDLM_IBITS,
1159 .ei_mode = it_to_lock_mode(it),
1160 .ei_cb_bl = cb_blocking,
1161 .ei_cb_cp = ldlm_completion_ast,
1163 struct lustre_handle lockh;
1168 CDEBUG(D_DLMTRACE, "(name: %.*s,"DFID") in obj "DFID
1169 ", intent: %s flags %#"LPF64"o\n", op_data->op_namelen,
1170 op_data->op_name, PFID(&op_data->op_fid2),
1171 PFID(&op_data->op_fid1), ldlm_it2str(it->it_op),
1175 if (fid_is_sane(&op_data->op_fid2) &&
1176 (it->it_op & (IT_LOOKUP | IT_GETATTR | IT_READDIR))) {
1177 /* We could just return 1 immediately, but since we should only
1178 * be called in revalidate_it if we already have a lock, let's
1180 it->d.lustre.it_lock_handle = 0;
1181 rc = mdc_revalidate_lock(exp, it, &op_data->op_fid2, NULL);
1182 /* Only return failure if it was not GETATTR by cfid
1183 (from inode_revalidate) */
1184 if (rc || op_data->op_namelen != 0)
1188 /* For case if upper layer did not alloc fid, do it now. */
1189 if (!fid_is_sane(&op_data->op_fid2) && it->it_op & IT_CREAT) {
1190 rc = mdc_fid_alloc(exp, &op_data->op_fid2, op_data);
1192 CERROR("Can't alloc new fid, rc %d\n", rc);
1197 rc = mdc_enqueue(exp, &einfo, NULL, it, op_data, &lockh,
1202 *reqp = it->d.lustre.it_data;
1203 rc = mdc_finish_intent_lock(exp, *reqp, op_data, it, &lockh);
1207 static int mdc_intent_getattr_async_interpret(const struct lu_env *env,
1208 struct ptlrpc_request *req,
1211 struct mdc_getattr_args *ga = args;
1212 struct obd_export *exp = ga->ga_exp;
1213 struct md_enqueue_info *minfo = ga->ga_minfo;
1214 struct ldlm_enqueue_info *einfo = ga->ga_einfo;
1215 struct lookup_intent *it;
1216 struct lustre_handle *lockh;
1217 struct obd_device *obddev;
1218 struct ldlm_reply *lockrep;
1219 __u64 flags = LDLM_FL_HAS_INTENT;
1223 lockh = &minfo->mi_lockh;
1225 obddev = class_exp2obd(exp);
1227 obd_put_request_slot(&obddev->u.cli);
1228 if (OBD_FAIL_CHECK(OBD_FAIL_MDC_GETATTR_ENQUEUE))
1231 rc = ldlm_cli_enqueue_fini(exp, req, einfo->ei_type, 1, einfo->ei_mode,
1232 &flags, NULL, 0, lockh, rc);
1234 CERROR("ldlm_cli_enqueue_fini: %d\n", rc);
1235 mdc_clear_replay_flag(req, rc);
1239 lockrep = req_capsule_server_get(&req->rq_pill, &RMF_DLM_REP);
1240 LASSERT(lockrep != NULL);
1242 lockrep->lock_policy_res2 =
1243 ptlrpc_status_ntoh(lockrep->lock_policy_res2);
1245 rc = mdc_finish_enqueue(exp, req, einfo, it, lockh, rc);
1249 rc = mdc_finish_intent_lock(exp, req, &minfo->mi_data, it, lockh);
1253 OBD_FREE_PTR(einfo);
1254 minfo->mi_cb(req, minfo, rc);
1258 int mdc_intent_getattr_async(struct obd_export *exp,
1259 struct md_enqueue_info *minfo,
1260 struct ldlm_enqueue_info *einfo)
1262 struct md_op_data *op_data = &minfo->mi_data;
1263 struct lookup_intent *it = &minfo->mi_it;
1264 struct ptlrpc_request *req;
1265 struct mdc_getattr_args *ga;
1266 struct obd_device *obddev = class_exp2obd(exp);
1267 struct ldlm_res_id res_id;
1268 /*XXX: Both MDS_INODELOCK_LOOKUP and MDS_INODELOCK_UPDATE are needed
1269 * for statahead currently. Consider CMD in future, such two bits
1270 * maybe managed by different MDS, should be adjusted then. */
1271 ldlm_policy_data_t policy = {
1272 .l_inodebits = { MDS_INODELOCK_LOOKUP |
1273 MDS_INODELOCK_UPDATE }
1276 __u64 flags = LDLM_FL_HAS_INTENT;
1279 CDEBUG(D_DLMTRACE, "name: %.*s in inode "DFID", intent: %s flags %#"
1281 op_data->op_namelen, op_data->op_name, PFID(&op_data->op_fid1),
1282 ldlm_it2str(it->it_op), it->it_flags);
1284 fid_build_reg_res_name(&op_data->op_fid1, &res_id);
1285 req = mdc_intent_getattr_pack(exp, it, op_data);
1287 RETURN(PTR_ERR(req));
1289 rc = obd_get_request_slot(&obddev->u.cli);
1291 ptlrpc_req_finished(req);
1295 rc = ldlm_cli_enqueue(exp, &req, einfo, &res_id, &policy, &flags, NULL,
1296 0, LVB_T_NONE, &minfo->mi_lockh, 1);
1298 obd_put_request_slot(&obddev->u.cli);
1299 ptlrpc_req_finished(req);
1303 CLASSERT(sizeof(*ga) <= sizeof(req->rq_async_args));
1304 ga = ptlrpc_req_async_args(req);
1306 ga->ga_minfo = minfo;
1307 ga->ga_einfo = einfo;
1309 req->rq_interpret_reply = mdc_intent_getattr_async_interpret;
1310 ptlrpcd_add_req(req, PDL_POLICY_LOCAL, -1);