4 * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
6 * This program is free software; you can redistribute it and/or modify
7 * it under the terms of the GNU General Public License version 2 only,
8 * as published by the Free Software Foundation.
10 * This program is distributed in the hope that it will be useful, but
11 * WITHOUT ANY WARRANTY; without even the implied warranty of
12 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
13 * General Public License version 2 for more details (a copy is included
14 * in the LICENSE file that accompanied this code).
16 * You should have received a copy of the GNU General Public License
17 * version 2 along with this program; If not, see
18 * http://www.sun.com/software/products/lustre/docs/GPLv2.pdf
20 * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara,
21 * CA 95054 USA or visit www.sun.com if you need additional information or
27 * Copyright (c) 2003, 2010, Oracle and/or its affiliates. All rights reserved.
28 * Use is subject to license terms.
30 * Copyright (c) 2011, 2013, Intel Corporation.
33 * This file is part of Lustre, http://www.lustre.org/
34 * Lustre is a trademark of Sun Microsystems, Inc.
37 #define DEBUG_SUBSYSTEM S_MDC
40 # include <linux/module.h>
41 # include <linux/lustre_intent.h>
43 # include <liblustre.h>
47 #include <obd_class.h>
48 #include <lustre_dlm.h>
49 #include <lustre_fid.h> /* fid_res_name_eq() */
50 #include <lustre_mdc.h>
51 #include <lustre_net.h>
52 #include <lustre_req_layout.h>
53 #include "mdc_internal.h"
55 struct mdc_getattr_args {
56 struct obd_export *ga_exp;
57 struct md_enqueue_info *ga_minfo;
58 struct ldlm_enqueue_info *ga_einfo;
61 int it_open_error(int phase, struct lookup_intent *it)
63 if (it_disposition(it, DISP_OPEN_LEASE)) {
64 if (phase >= DISP_OPEN_LEASE)
65 return it->d.lustre.it_status;
69 if (it_disposition(it, DISP_OPEN_OPEN)) {
70 if (phase >= DISP_OPEN_OPEN)
71 return it->d.lustre.it_status;
76 if (it_disposition(it, DISP_OPEN_CREATE)) {
77 if (phase >= DISP_OPEN_CREATE)
78 return it->d.lustre.it_status;
83 if (it_disposition(it, DISP_LOOKUP_EXECD)) {
84 if (phase >= DISP_LOOKUP_EXECD)
85 return it->d.lustre.it_status;
90 if (it_disposition(it, DISP_IT_EXECD)) {
91 if (phase >= DISP_IT_EXECD)
92 return it->d.lustre.it_status;
96 CERROR("it disp: %X, status: %d\n", it->d.lustre.it_disposition,
97 it->d.lustre.it_status);
101 EXPORT_SYMBOL(it_open_error);
103 /* this must be called on a lockh that is known to have a referenced lock */
104 int mdc_set_lock_data(struct obd_export *exp, __u64 *lockh, void *data,
107 struct ldlm_lock *lock;
108 struct inode *new_inode = data;
117 lock = ldlm_handle2lock((struct lustre_handle *)lockh);
119 LASSERT(lock != NULL);
120 lock_res_and_lock(lock);
122 if (lock->l_resource->lr_lvb_inode &&
123 lock->l_resource->lr_lvb_inode != data) {
124 struct inode *old_inode = lock->l_resource->lr_lvb_inode;
125 LASSERTF(old_inode->i_state & I_FREEING,
126 "Found existing inode %p/%lu/%u state %lu in lock: "
127 "setting data to %p/%lu/%u\n", old_inode,
128 old_inode->i_ino, old_inode->i_generation,
130 new_inode, new_inode->i_ino, new_inode->i_generation);
133 lock->l_resource->lr_lvb_inode = new_inode;
135 *bits = lock->l_policy_data.l_inodebits.bits;
137 unlock_res_and_lock(lock);
143 ldlm_mode_t mdc_lock_match(struct obd_export *exp, __u64 flags,
144 const struct lu_fid *fid, ldlm_type_t type,
145 ldlm_policy_data_t *policy, ldlm_mode_t mode,
146 struct lustre_handle *lockh)
148 struct ldlm_res_id res_id;
152 fid_build_reg_res_name(fid, &res_id);
153 /* LU-4405: Clear bits not supported by server */
154 policy->l_inodebits.bits &= exp_connect_ibits(exp);
155 rc = ldlm_lock_match(class_exp2obd(exp)->obd_namespace, flags,
156 &res_id, type, policy, mode, lockh, 0);
160 int mdc_cancel_unused(struct obd_export *exp,
161 const struct lu_fid *fid,
162 ldlm_policy_data_t *policy,
164 ldlm_cancel_flags_t flags,
167 struct ldlm_res_id res_id;
168 struct obd_device *obd = class_exp2obd(exp);
173 fid_build_reg_res_name(fid, &res_id);
174 rc = ldlm_cli_cancel_unused_resource(obd->obd_namespace, &res_id,
175 policy, mode, flags, opaque);
179 int mdc_null_inode(struct obd_export *exp,
180 const struct lu_fid *fid)
182 struct ldlm_res_id res_id;
183 struct ldlm_resource *res;
184 struct ldlm_namespace *ns = class_exp2obd(exp)->obd_namespace;
187 LASSERTF(ns != NULL, "no namespace passed\n");
189 fid_build_reg_res_name(fid, &res_id);
191 res = ldlm_resource_get(ns, NULL, &res_id, 0, 0);
196 res->lr_lvb_inode = NULL;
199 ldlm_resource_putref(res);
203 /* find any ldlm lock of the inode in mdc
207 int mdc_find_cbdata(struct obd_export *exp,
208 const struct lu_fid *fid,
209 ldlm_iterator_t it, void *data)
211 struct ldlm_res_id res_id;
215 fid_build_reg_res_name((struct lu_fid*)fid, &res_id);
216 rc = ldlm_resource_iterate(class_exp2obd(exp)->obd_namespace, &res_id,
218 if (rc == LDLM_ITER_STOP)
220 else if (rc == LDLM_ITER_CONTINUE)
225 static inline void mdc_clear_replay_flag(struct ptlrpc_request *req, int rc)
227 /* Don't hold error requests for replay. */
228 if (req->rq_replay) {
229 spin_lock(&req->rq_lock);
231 spin_unlock(&req->rq_lock);
233 if (rc && req->rq_transno != 0) {
234 DEBUG_REQ(D_ERROR, req, "transno returned on error rc %d", rc);
239 /* Save a large LOV EA into the request buffer so that it is available
240 * for replay. We don't do this in the initial request because the
241 * original request doesn't need this buffer (at most it sends just the
242 * lov_mds_md) and it is a waste of RAM/bandwidth to send the empty
243 * buffer and may also be difficult to allocate and save a very large
244 * request buffer for each open. (bug 5707)
246 * OOM here may cause recovery failure if lmm is needed (only for the
247 * original open if the MDS crashed just when this client also OOM'd)
248 * but this is incredibly unlikely, and questionable whether the client
249 * could do MDS recovery under OOM anyways... */
250 static void mdc_realloc_openmsg(struct ptlrpc_request *req,
251 struct mdt_body *body)
255 /* FIXME: remove this explicit offset. */
256 rc = sptlrpc_cli_enlarge_reqbuf(req, DLM_INTENT_REC_OFF + 4,
259 CERROR("Can't enlarge segment %d size to %d\n",
260 DLM_INTENT_REC_OFF + 4, body->eadatasize);
261 body->valid &= ~OBD_MD_FLEASIZE;
262 body->eadatasize = 0;
266 static struct ptlrpc_request *mdc_intent_open_pack(struct obd_export *exp,
267 struct lookup_intent *it,
268 struct md_op_data *op_data,
269 void *lmm, int lmmsize,
272 struct ptlrpc_request *req;
273 struct obd_device *obddev = class_exp2obd(exp);
274 struct ldlm_intent *lit;
275 CFS_LIST_HEAD(cancels);
281 it->it_create_mode = (it->it_create_mode & ~S_IFMT) | S_IFREG;
283 /* XXX: openlock is not cancelled for cross-refs. */
284 /* If inode is known, cancel conflicting OPEN locks. */
285 if (fid_is_sane(&op_data->op_fid2)) {
286 if (it->it_flags & MDS_OPEN_LEASE) { /* try to get lease */
287 if (it->it_flags & FMODE_WRITE)
292 if (it->it_flags & (FMODE_WRITE|MDS_OPEN_TRUNC))
295 else if (it->it_flags & FMODE_EXEC)
301 count = mdc_resource_get_unused(exp, &op_data->op_fid2,
306 /* If CREATE, cancel parent's UPDATE lock. */
307 if (it->it_op & IT_CREAT)
311 count += mdc_resource_get_unused(exp, &op_data->op_fid1,
313 MDS_INODELOCK_UPDATE);
315 req = ptlrpc_request_alloc(class_exp2cliimp(exp),
316 &RQF_LDLM_INTENT_OPEN);
318 ldlm_lock_list_put(&cancels, l_bl_ast, count);
319 RETURN(ERR_PTR(-ENOMEM));
322 /* parent capability */
323 mdc_set_capa_size(req, &RMF_CAPA1, op_data->op_capa1);
324 /* child capability, reserve the size according to parent capa, it will
325 * be filled after we get the reply */
326 mdc_set_capa_size(req, &RMF_CAPA2, op_data->op_capa1);
328 req_capsule_set_size(&req->rq_pill, &RMF_NAME, RCL_CLIENT,
329 op_data->op_namelen + 1);
330 req_capsule_set_size(&req->rq_pill, &RMF_EADATA, RCL_CLIENT,
331 max(lmmsize, obddev->u.cli.cl_default_mds_easize));
333 rc = ldlm_prep_enqueue_req(exp, req, &cancels, count);
335 ptlrpc_request_free(req);
339 spin_lock(&req->rq_lock);
340 req->rq_replay = req->rq_import->imp_replayable;
341 spin_unlock(&req->rq_lock);
343 /* pack the intent */
344 lit = req_capsule_client_get(&req->rq_pill, &RMF_LDLM_INTENT);
345 lit->opc = (__u64)it->it_op;
347 /* pack the intended request */
348 mdc_open_pack(req, op_data, it->it_create_mode, 0, it->it_flags, lmm,
351 req_capsule_set_size(&req->rq_pill, &RMF_MDT_MD, RCL_SERVER,
352 obddev->u.cli.cl_max_mds_easize);
354 /* for remote client, fetch remote perm for current user */
355 if (client_is_remote(exp))
356 req_capsule_set_size(&req->rq_pill, &RMF_ACL, RCL_SERVER,
357 sizeof(struct mdt_remote_perm));
358 ptlrpc_request_set_replen(req);
362 static struct ptlrpc_request *
363 mdc_intent_getxattr_pack(struct obd_export *exp,
364 struct lookup_intent *it,
365 struct md_op_data *op_data)
367 struct ptlrpc_request *req;
368 struct ldlm_intent *lit;
369 int rc, count = 0, maxdata;
370 CFS_LIST_HEAD(cancels);
374 req = ptlrpc_request_alloc(class_exp2cliimp(exp),
375 &RQF_LDLM_INTENT_GETXATTR);
377 RETURN(ERR_PTR(-ENOMEM));
379 mdc_set_capa_size(req, &RMF_CAPA1, op_data->op_capa1);
381 rc = ldlm_prep_enqueue_req(exp, req, &cancels, count);
383 ptlrpc_request_free(req);
387 /* pack the intent */
388 lit = req_capsule_client_get(&req->rq_pill, &RMF_LDLM_INTENT);
389 lit->opc = IT_GETXATTR;
391 maxdata = class_exp2cliimp(exp)->imp_connect_data.ocd_max_easize;
393 /* pack the intended request */
394 mdc_pack_body(req, &op_data->op_fid1, op_data->op_capa1,
395 op_data->op_valid, maxdata, -1, 0);
397 req_capsule_set_size(&req->rq_pill, &RMF_EADATA,
398 RCL_SERVER, maxdata);
400 req_capsule_set_size(&req->rq_pill, &RMF_EAVALS,
401 RCL_SERVER, maxdata);
403 req_capsule_set_size(&req->rq_pill, &RMF_EAVALS_LENS,
404 RCL_SERVER, maxdata);
406 ptlrpc_request_set_replen(req);
411 static struct ptlrpc_request *mdc_intent_unlink_pack(struct obd_export *exp,
412 struct lookup_intent *it,
413 struct md_op_data *op_data)
415 struct ptlrpc_request *req;
416 struct obd_device *obddev = class_exp2obd(exp);
417 struct ldlm_intent *lit;
421 req = ptlrpc_request_alloc(class_exp2cliimp(exp),
422 &RQF_LDLM_INTENT_UNLINK);
424 RETURN(ERR_PTR(-ENOMEM));
426 mdc_set_capa_size(req, &RMF_CAPA1, op_data->op_capa1);
427 req_capsule_set_size(&req->rq_pill, &RMF_NAME, RCL_CLIENT,
428 op_data->op_namelen + 1);
430 rc = ldlm_prep_enqueue_req(exp, req, NULL, 0);
432 ptlrpc_request_free(req);
436 /* pack the intent */
437 lit = req_capsule_client_get(&req->rq_pill, &RMF_LDLM_INTENT);
438 lit->opc = (__u64)it->it_op;
440 /* pack the intended request */
441 mdc_unlink_pack(req, op_data);
443 req_capsule_set_size(&req->rq_pill, &RMF_MDT_MD, RCL_SERVER,
444 obddev->u.cli.cl_max_mds_easize);
445 req_capsule_set_size(&req->rq_pill, &RMF_ACL, RCL_SERVER,
446 obddev->u.cli.cl_max_mds_cookiesize);
447 ptlrpc_request_set_replen(req);
451 static struct ptlrpc_request *mdc_intent_getattr_pack(struct obd_export *exp,
452 struct lookup_intent *it,
453 struct md_op_data *op_data)
455 struct ptlrpc_request *req;
456 struct obd_device *obddev = class_exp2obd(exp);
457 obd_valid valid = OBD_MD_FLGETATTR | OBD_MD_FLEASIZE |
458 OBD_MD_FLMODEASIZE | OBD_MD_FLDIREA |
459 OBD_MD_FLMDSCAPA | OBD_MD_MEA |
460 (client_is_remote(exp) ?
461 OBD_MD_FLRMTPERM : OBD_MD_FLACL);
462 struct ldlm_intent *lit;
466 req = ptlrpc_request_alloc(class_exp2cliimp(exp),
467 &RQF_LDLM_INTENT_GETATTR);
469 RETURN(ERR_PTR(-ENOMEM));
471 mdc_set_capa_size(req, &RMF_CAPA1, op_data->op_capa1);
472 req_capsule_set_size(&req->rq_pill, &RMF_NAME, RCL_CLIENT,
473 op_data->op_namelen + 1);
475 rc = ldlm_prep_enqueue_req(exp, req, NULL, 0);
477 ptlrpc_request_free(req);
481 /* pack the intent */
482 lit = req_capsule_client_get(&req->rq_pill, &RMF_LDLM_INTENT);
483 lit->opc = (__u64)it->it_op;
485 /* pack the intended request */
486 mdc_getattr_pack(req, valid, it->it_flags, op_data,
487 obddev->u.cli.cl_max_mds_easize);
489 req_capsule_set_size(&req->rq_pill, &RMF_MDT_MD, RCL_SERVER,
490 obddev->u.cli.cl_max_mds_easize);
491 if (client_is_remote(exp))
492 req_capsule_set_size(&req->rq_pill, &RMF_ACL, RCL_SERVER,
493 sizeof(struct mdt_remote_perm));
494 ptlrpc_request_set_replen(req);
498 static struct ptlrpc_request *mdc_intent_layout_pack(struct obd_export *exp,
499 struct lookup_intent *it,
500 struct md_op_data *unused)
502 struct obd_device *obd = class_exp2obd(exp);
503 struct ptlrpc_request *req;
504 struct ldlm_intent *lit;
505 struct layout_intent *layout;
509 req = ptlrpc_request_alloc(class_exp2cliimp(exp),
510 &RQF_LDLM_INTENT_LAYOUT);
512 RETURN(ERR_PTR(-ENOMEM));
514 req_capsule_set_size(&req->rq_pill, &RMF_EADATA, RCL_CLIENT, 0);
515 rc = ldlm_prep_enqueue_req(exp, req, NULL, 0);
517 ptlrpc_request_free(req);
521 /* pack the intent */
522 lit = req_capsule_client_get(&req->rq_pill, &RMF_LDLM_INTENT);
523 lit->opc = (__u64)it->it_op;
525 /* pack the layout intent request */
526 layout = req_capsule_client_get(&req->rq_pill, &RMF_LAYOUT_INTENT);
527 /* LAYOUT_INTENT_ACCESS is generic, specific operation will be
528 * set for replication */
529 layout->li_opc = LAYOUT_INTENT_ACCESS;
531 req_capsule_set_size(&req->rq_pill, &RMF_DLM_LVB, RCL_SERVER,
532 obd->u.cli.cl_max_mds_easize);
533 ptlrpc_request_set_replen(req);
537 static struct ptlrpc_request *
538 mdc_enqueue_pack(struct obd_export *exp, int lvb_len)
540 struct ptlrpc_request *req;
544 req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_LDLM_ENQUEUE);
546 RETURN(ERR_PTR(-ENOMEM));
548 rc = ldlm_prep_enqueue_req(exp, req, NULL, 0);
550 ptlrpc_request_free(req);
554 req_capsule_set_size(&req->rq_pill, &RMF_DLM_LVB, RCL_SERVER, lvb_len);
555 ptlrpc_request_set_replen(req);
559 static int mdc_finish_enqueue(struct obd_export *exp,
560 struct ptlrpc_request *req,
561 struct ldlm_enqueue_info *einfo,
562 struct lookup_intent *it,
563 struct lustre_handle *lockh,
566 struct req_capsule *pill = &req->rq_pill;
567 struct ldlm_request *lockreq;
568 struct ldlm_reply *lockrep;
569 struct lustre_intent_data *intent = &it->d.lustre;
570 struct ldlm_lock *lock;
571 void *lvb_data = NULL;
576 /* Similarly, if we're going to replay this request, we don't want to
577 * actually get a lock, just perform the intent. */
578 if (req->rq_transno || req->rq_replay) {
579 lockreq = req_capsule_client_get(pill, &RMF_DLM_REQ);
580 lockreq->lock_flags |= ldlm_flags_to_wire(LDLM_FL_INTENT_ONLY);
583 if (rc == ELDLM_LOCK_ABORTED) {
585 memset(lockh, 0, sizeof(*lockh));
587 } else { /* rc = 0 */
588 lock = ldlm_handle2lock(lockh);
589 LASSERT(lock != NULL);
591 /* If the server gave us back a different lock mode, we should
592 * fix up our variables. */
593 if (lock->l_req_mode != einfo->ei_mode) {
594 ldlm_lock_addref(lockh, lock->l_req_mode);
595 ldlm_lock_decref(lockh, einfo->ei_mode);
596 einfo->ei_mode = lock->l_req_mode;
601 lockrep = req_capsule_server_get(pill, &RMF_DLM_REP);
602 LASSERT(lockrep != NULL); /* checked by ldlm_cli_enqueue() */
604 intent->it_disposition = (int)lockrep->lock_policy_res1;
605 intent->it_status = (int)lockrep->lock_policy_res2;
606 intent->it_lock_mode = einfo->ei_mode;
607 intent->it_lock_handle = lockh->cookie;
608 intent->it_data = req;
610 /* Technically speaking rq_transno must already be zero if
611 * it_status is in error, so the check is a bit redundant */
612 if ((!req->rq_transno || intent->it_status < 0) && req->rq_replay)
613 mdc_clear_replay_flag(req, intent->it_status);
615 /* If we're doing an IT_OPEN which did not result in an actual
616 * successful open, then we need to remove the bit which saves
617 * this request for unconditional replay.
619 * It's important that we do this first! Otherwise we might exit the
620 * function without doing so, and try to replay a failed create
622 if (it->it_op & IT_OPEN && req->rq_replay &&
623 (!it_disposition(it, DISP_OPEN_OPEN) ||intent->it_status != 0))
624 mdc_clear_replay_flag(req, intent->it_status);
626 DEBUG_REQ(D_RPCTRACE, req, "op: %d disposition: %x, status: %d",
627 it->it_op, intent->it_disposition, intent->it_status);
629 /* We know what to expect, so we do any byte flipping required here */
630 if (it->it_op & (IT_OPEN | IT_UNLINK | IT_LOOKUP | IT_GETATTR)) {
631 struct mdt_body *body;
633 body = req_capsule_server_get(pill, &RMF_MDT_BODY);
635 CERROR ("Can't swab mdt_body\n");
639 if (it_disposition(it, DISP_OPEN_OPEN) &&
640 !it_open_error(DISP_OPEN_OPEN, it)) {
642 * If this is a successful OPEN request, we need to set
643 * replay handler and data early, so that if replay
644 * happens immediately after swabbing below, new reply
645 * is swabbed by that handler correctly.
647 mdc_set_open_replay_data(NULL, NULL, it);
650 if ((body->valid & (OBD_MD_FLDIREA | OBD_MD_FLEASIZE)) != 0) {
653 mdc_update_max_ea_from_body(exp, body);
656 * The eadata is opaque; just check that it is there.
657 * Eventually, obd_unpackmd() will check the contents.
659 eadata = req_capsule_server_sized_get(pill, &RMF_MDT_MD,
664 /* save lvb data and length in case this is for layout
667 lvb_len = body->eadatasize;
670 * We save the reply LOV EA in case we have to replay a
671 * create for recovery. If we didn't allocate a large
672 * enough request buffer above we need to reallocate it
673 * here to hold the actual LOV EA.
675 * To not save LOV EA if request is not going to replay
676 * (for example error one).
678 if ((it->it_op & IT_OPEN) && req->rq_replay) {
680 if (req_capsule_get_size(pill, &RMF_EADATA,
683 mdc_realloc_openmsg(req, body);
685 req_capsule_shrink(pill, &RMF_EADATA,
689 req_capsule_set_size(pill, &RMF_EADATA,
693 lmm = req_capsule_client_get(pill, &RMF_EADATA);
695 memcpy(lmm, eadata, body->eadatasize);
699 if (body->valid & OBD_MD_FLRMTPERM) {
700 struct mdt_remote_perm *perm;
702 LASSERT(client_is_remote(exp));
703 perm = req_capsule_server_swab_get(pill, &RMF_ACL,
704 lustre_swab_mdt_remote_perm);
708 if (body->valid & OBD_MD_FLMDSCAPA) {
709 struct lustre_capa *capa, *p;
711 capa = req_capsule_server_get(pill, &RMF_CAPA1);
715 if (it->it_op & IT_OPEN) {
716 /* client fid capa will be checked in replay */
717 p = req_capsule_client_get(pill, &RMF_CAPA2);
722 if (body->valid & OBD_MD_FLOSSCAPA) {
723 struct lustre_capa *capa;
725 capa = req_capsule_server_get(pill, &RMF_CAPA2);
729 } else if (it->it_op & IT_LAYOUT) {
730 /* maybe the lock was granted right away and layout
731 * is packed into RMF_DLM_LVB of req */
732 lvb_len = req_capsule_get_size(pill, &RMF_DLM_LVB, RCL_SERVER);
734 lvb_data = req_capsule_server_sized_get(pill,
735 &RMF_DLM_LVB, lvb_len);
736 if (lvb_data == NULL)
741 /* fill in stripe data for layout lock */
742 lock = ldlm_handle2lock(lockh);
743 if (lock != NULL && ldlm_has_layout(lock) && lvb_data != NULL) {
746 LDLM_DEBUG(lock, "layout lock returned by: %s, lvb_len: %d\n",
747 ldlm_it2str(it->it_op), lvb_len);
749 OBD_ALLOC_LARGE(lmm, lvb_len);
754 memcpy(lmm, lvb_data, lvb_len);
756 /* install lvb_data */
757 lock_res_and_lock(lock);
758 if (lock->l_lvb_data == NULL) {
759 lock->l_lvb_type = LVB_T_LAYOUT;
760 lock->l_lvb_data = lmm;
761 lock->l_lvb_len = lvb_len;
764 unlock_res_and_lock(lock);
766 OBD_FREE_LARGE(lmm, lvb_len);
774 /* We always reserve enough space in the reply packet for a stripe MD, because
775 * we don't know in advance the file type. */
776 int mdc_enqueue(struct obd_export *exp, struct ldlm_enqueue_info *einfo,
777 struct lookup_intent *it, struct md_op_data *op_data,
778 struct lustre_handle *lockh, void *lmm, int lmmsize,
779 struct ptlrpc_request **reqp, __u64 extra_lock_flags)
781 struct obd_device *obddev = class_exp2obd(exp);
782 struct ptlrpc_request *req = NULL;
783 __u64 flags, saved_flags = extra_lock_flags;
785 struct ldlm_res_id res_id;
786 static const ldlm_policy_data_t lookup_policy =
787 { .l_inodebits = { MDS_INODELOCK_LOOKUP } };
788 static const ldlm_policy_data_t update_policy =
789 { .l_inodebits = { MDS_INODELOCK_UPDATE } };
790 static const ldlm_policy_data_t layout_policy =
791 { .l_inodebits = { MDS_INODELOCK_LAYOUT } };
792 static const ldlm_policy_data_t getxattr_policy = {
793 .l_inodebits = { MDS_INODELOCK_XATTR } };
794 ldlm_policy_data_t const *policy = &lookup_policy;
795 int generation, resends = 0;
796 struct ldlm_reply *lockrep;
797 enum lvb_type lvb_type = 0;
800 LASSERTF(!it || einfo->ei_type == LDLM_IBITS, "lock type %d\n",
803 fid_build_reg_res_name(&op_data->op_fid1, &res_id);
806 saved_flags |= LDLM_FL_HAS_INTENT;
807 if (it->it_op & (IT_UNLINK | IT_GETATTR | IT_READDIR))
808 policy = &update_policy;
809 else if (it->it_op & IT_LAYOUT)
810 policy = &layout_policy;
811 else if (it->it_op & (IT_GETXATTR | IT_SETXATTR))
812 policy = &getxattr_policy;
815 LASSERT(reqp == NULL);
817 generation = obddev->u.cli.cl_import->imp_generation;
821 /* The only way right now is FLOCK, in this case we hide flock
822 policy as lmm, but lmmsize is 0 */
823 LASSERT(lmm && lmmsize == 0);
824 LASSERTF(einfo->ei_type == LDLM_FLOCK, "lock type %d\n",
826 policy = (ldlm_policy_data_t *)lmm;
827 res_id.name[3] = LDLM_FLOCK;
828 } else if (it->it_op & IT_OPEN) {
829 req = mdc_intent_open_pack(exp, it, op_data, lmm, lmmsize,
831 policy = &update_policy;
832 einfo->ei_cbdata = NULL;
834 } else if (it->it_op & IT_UNLINK) {
835 req = mdc_intent_unlink_pack(exp, it, op_data);
836 } else if (it->it_op & (IT_GETATTR | IT_LOOKUP)) {
837 req = mdc_intent_getattr_pack(exp, it, op_data);
838 } else if (it->it_op & IT_READDIR) {
839 req = mdc_enqueue_pack(exp, 0);
840 } else if (it->it_op & IT_LAYOUT) {
841 if (!imp_connect_lvb_type(class_exp2cliimp(exp)))
843 req = mdc_intent_layout_pack(exp, it, op_data);
844 lvb_type = LVB_T_LAYOUT;
845 } else if (it->it_op & IT_GETXATTR) {
846 req = mdc_intent_getxattr_pack(exp, it, op_data);
853 RETURN(PTR_ERR(req));
855 if (req != NULL && it && it->it_op & IT_CREAT)
856 /* ask ptlrpc not to resend on EINPROGRESS since we have our own
858 req->rq_no_retry_einprogress = 1;
861 req->rq_generation_set = 1;
862 req->rq_import_generation = generation;
863 req->rq_sent = cfs_time_current_sec() + resends;
866 /* It is important to obtain rpc_lock first (if applicable), so that
867 * threads that are serialised with rpc_lock are not polluting our
868 * rpcs in flight counter. We do not do flock request limiting, though*/
870 mdc_get_rpc_lock(obddev->u.cli.cl_rpc_lock, it);
871 rc = mdc_enter_request(&obddev->u.cli);
873 mdc_put_rpc_lock(obddev->u.cli.cl_rpc_lock, it);
874 mdc_clear_replay_flag(req, 0);
875 ptlrpc_req_finished(req);
880 rc = ldlm_cli_enqueue(exp, &req, einfo, &res_id, policy, &flags, NULL,
881 0, lvb_type, lockh, 0);
883 /* For flock requests we immediatelly return without further
884 delay and let caller deal with the rest, since rest of
885 this function metadata processing makes no sense for flock
886 requests anyway. But in case of problem during comms with
887 Server (ETIMEDOUT) or any signal/kill attempt (EINTR), we
888 can not rely on caller and this mainly for F_UNLCKs
889 (explicits or automatically generated by Kernel to clean
890 current FLocks upon exit) that can't be trashed */
891 if (((rc == -EINTR) || (rc == -ETIMEDOUT)) &&
892 (einfo->ei_type == LDLM_FLOCK) &&
893 (einfo->ei_mode == LCK_NL))
898 mdc_exit_request(&obddev->u.cli);
899 mdc_put_rpc_lock(obddev->u.cli.cl_rpc_lock, it);
902 CERROR("ldlm_cli_enqueue: %d\n", rc);
903 mdc_clear_replay_flag(req, rc);
904 ptlrpc_req_finished(req);
908 lockrep = req_capsule_server_get(&req->rq_pill, &RMF_DLM_REP);
909 LASSERT(lockrep != NULL);
911 lockrep->lock_policy_res2 =
912 ptlrpc_status_ntoh(lockrep->lock_policy_res2);
914 /* Retry the create infinitely when we get -EINPROGRESS from
915 * server. This is required by the new quota design. */
916 if (it && it->it_op & IT_CREAT &&
917 (int)lockrep->lock_policy_res2 == -EINPROGRESS) {
918 mdc_clear_replay_flag(req, rc);
919 ptlrpc_req_finished(req);
922 CDEBUG(D_HA, "%s: resend:%d op:%d "DFID"/"DFID"\n",
923 obddev->obd_name, resends, it->it_op,
924 PFID(&op_data->op_fid1), PFID(&op_data->op_fid2));
926 if (generation == obddev->u.cli.cl_import->imp_generation) {
929 CDEBUG(D_HA, "resend cross eviction\n");
934 rc = mdc_finish_enqueue(exp, req, einfo, it, lockh, rc);
936 if (lustre_handle_is_used(lockh)) {
937 ldlm_lock_decref(lockh, einfo->ei_mode);
938 memset(lockh, 0, sizeof(*lockh));
940 ptlrpc_req_finished(req);
945 static int mdc_finish_intent_lock(struct obd_export *exp,
946 struct ptlrpc_request *request,
947 struct md_op_data *op_data,
948 struct lookup_intent *it,
949 struct lustre_handle *lockh)
951 struct lustre_handle old_lock;
952 struct mdt_body *mdt_body;
953 struct ldlm_lock *lock;
957 LASSERT(request != NULL);
958 LASSERT(request != LP_POISON);
959 LASSERT(request->rq_repmsg != LP_POISON);
961 if (it->it_op & IT_READDIR)
964 if (!it_disposition(it, DISP_IT_EXECD)) {
965 /* The server failed before it even started executing the
966 * intent, i.e. because it couldn't unpack the request. */
967 LASSERT(it->d.lustre.it_status != 0);
968 RETURN(it->d.lustre.it_status);
970 rc = it_open_error(DISP_IT_EXECD, it);
974 mdt_body = req_capsule_server_get(&request->rq_pill, &RMF_MDT_BODY);
975 LASSERT(mdt_body != NULL); /* mdc_enqueue checked */
977 /* If we were revalidating a fid/name pair, mark the intent in
978 * case we fail and get called again from lookup */
979 if (fid_is_sane(&op_data->op_fid2) &&
980 it->it_create_mode & M_CHECK_STALE &&
981 it->it_op != IT_GETATTR) {
982 /* Also: did we find the same inode? */
983 /* sever can return one of two fids:
984 * op_fid2 - new allocated fid - if file is created.
985 * op_fid3 - existent fid - if file only open.
986 * op_fid3 is saved in lmv_intent_open */
987 if ((!lu_fid_eq(&op_data->op_fid2, &mdt_body->fid1)) &&
988 (!lu_fid_eq(&op_data->op_fid3, &mdt_body->fid1))) {
989 CDEBUG(D_DENTRY, "Found stale data "DFID"("DFID")/"DFID
990 "\n", PFID(&op_data->op_fid2),
991 PFID(&op_data->op_fid2), PFID(&mdt_body->fid1));
996 rc = it_open_error(DISP_LOOKUP_EXECD, it);
1000 /* keep requests around for the multiple phases of the call
1001 * this shows the DISP_XX must guarantee we make it into the call
1003 if (!it_disposition(it, DISP_ENQ_CREATE_REF) &&
1004 it_disposition(it, DISP_OPEN_CREATE) &&
1005 !it_open_error(DISP_OPEN_CREATE, it)) {
1006 it_set_disposition(it, DISP_ENQ_CREATE_REF);
1007 ptlrpc_request_addref(request); /* balanced in ll_create_node */
1009 if (!it_disposition(it, DISP_ENQ_OPEN_REF) &&
1010 it_disposition(it, DISP_OPEN_OPEN) &&
1011 !it_open_error(DISP_OPEN_OPEN, it)) {
1012 it_set_disposition(it, DISP_ENQ_OPEN_REF);
1013 ptlrpc_request_addref(request); /* balanced in ll_file_open */
1014 /* BUG 11546 - eviction in the middle of open rpc processing */
1015 OBD_FAIL_TIMEOUT(OBD_FAIL_MDC_ENQUEUE_PAUSE, obd_timeout);
1018 if (it->it_op & IT_CREAT) {
1019 /* XXX this belongs in ll_create_it */
1020 } else if (it->it_op == IT_OPEN) {
1021 LASSERT(!it_disposition(it, DISP_OPEN_CREATE));
1023 LASSERT(it->it_op & (IT_GETATTR | IT_LOOKUP | IT_LAYOUT));
1026 /* If we already have a matching lock, then cancel the new
1027 * one. We have to set the data here instead of in
1028 * mdc_enqueue, because we need to use the child's inode as
1029 * the l_ast_data to match, and that's not available until
1030 * intent_finish has performed the iget().) */
1031 lock = ldlm_handle2lock(lockh);
1033 ldlm_policy_data_t policy = lock->l_policy_data;
1034 LDLM_DEBUG(lock, "matching against this");
1036 LASSERTF(fid_res_name_eq(&mdt_body->fid1,
1037 &lock->l_resource->lr_name),
1038 "Lock res_id: "DLDLMRES", fid: "DFID"\n",
1039 PLDLMRES(lock->l_resource), PFID(&mdt_body->fid1));
1040 LDLM_LOCK_PUT(lock);
1042 memcpy(&old_lock, lockh, sizeof(*lockh));
1043 if (ldlm_lock_match(NULL, LDLM_FL_BLOCK_GRANTED, NULL,
1044 LDLM_IBITS, &policy, LCK_NL, &old_lock, 0)) {
1045 ldlm_lock_decref_and_cancel(lockh,
1046 it->d.lustre.it_lock_mode);
1047 memcpy(lockh, &old_lock, sizeof(old_lock));
1048 it->d.lustre.it_lock_handle = lockh->cookie;
1051 CDEBUG(D_DENTRY,"D_IT dentry %.*s intent: %s status %d disp %x rc %d\n",
1052 op_data->op_namelen, op_data->op_name, ldlm_it2str(it->it_op),
1053 it->d.lustre.it_status, it->d.lustre.it_disposition, rc);
1057 int mdc_revalidate_lock(struct obd_export *exp, struct lookup_intent *it,
1058 struct lu_fid *fid, __u64 *bits)
1060 /* We could just return 1 immediately, but since we should only
1061 * be called in revalidate_it if we already have a lock, let's
1063 struct ldlm_res_id res_id;
1064 struct lustre_handle lockh;
1065 ldlm_policy_data_t policy;
1069 if (it->d.lustre.it_lock_handle) {
1070 lockh.cookie = it->d.lustre.it_lock_handle;
1071 mode = ldlm_revalidate_lock_handle(&lockh, bits);
1073 fid_build_reg_res_name(fid, &res_id);
1074 switch (it->it_op) {
1076 /* File attributes are held under multiple bits:
1077 * nlink is under lookup lock, size and times are
1078 * under UPDATE lock and recently we've also got
1079 * a separate permissions lock for owner/group/acl that
1080 * were protected by lookup lock before.
1081 * Getattr must provide all of that information,
1082 * so we need to ensure we have all of those locks.
1083 * Unfortunately, if the bits are split across multiple
1084 * locks, there's no easy way to match all of them here,
1085 * so an extra RPC would be performed to fetch all
1086 * of those bits at once for now. */
1087 /* For new MDTs(> 2.4), UPDATE|PERM should be enough,
1088 * but for old MDTs (< 2.4), permission is covered
1089 * by LOOKUP lock, so it needs to match all bits here.*/
1090 policy.l_inodebits.bits = MDS_INODELOCK_UPDATE |
1091 MDS_INODELOCK_LOOKUP |
1095 policy.l_inodebits.bits = MDS_INODELOCK_UPDATE;
1098 policy.l_inodebits.bits = MDS_INODELOCK_LAYOUT;
1101 policy.l_inodebits.bits = MDS_INODELOCK_LOOKUP;
1105 mode = mdc_lock_match(exp, LDLM_FL_BLOCK_GRANTED, fid,
1106 LDLM_IBITS, &policy,
1107 LCK_CR | LCK_CW | LCK_PR | LCK_PW,
1112 it->d.lustre.it_lock_handle = lockh.cookie;
1113 it->d.lustre.it_lock_mode = mode;
1115 it->d.lustre.it_lock_handle = 0;
1116 it->d.lustre.it_lock_mode = 0;
1123 * This long block is all about fixing up the lock and request state
1124 * so that it is correct as of the moment _before_ the operation was
1125 * applied; that way, the VFS will think that everything is normal and
1126 * call Lustre's regular VFS methods.
1128 * If we're performing a creation, that means that unless the creation
1129 * failed with EEXIST, we should fake up a negative dentry.
1131 * For everything else, we want to lookup to succeed.
1133 * One additional note: if CREATE or OPEN succeeded, we add an extra
1134 * reference to the request because we need to keep it around until
1135 * ll_create/ll_open gets called.
1137 * The server will return to us, in it_disposition, an indication of
1138 * exactly what d.lustre.it_status refers to.
1140 * If DISP_OPEN_OPEN is set, then d.lustre.it_status refers to the open() call,
1141 * otherwise if DISP_OPEN_CREATE is set, then it status is the
1142 * creation failure mode. In either case, one of DISP_LOOKUP_NEG or
1143 * DISP_LOOKUP_POS will be set, indicating whether the child lookup
1146 * Else, if DISP_LOOKUP_EXECD then d.lustre.it_status is the rc of the
1149 int mdc_intent_lock(struct obd_export *exp, struct md_op_data *op_data,
1150 void *lmm, int lmmsize, struct lookup_intent *it,
1151 int lookup_flags, struct ptlrpc_request **reqp,
1152 ldlm_blocking_callback cb_blocking,
1153 __u64 extra_lock_flags)
1155 struct ldlm_enqueue_info einfo = {
1156 .ei_type = LDLM_IBITS,
1157 .ei_mode = it_to_lock_mode(it),
1158 .ei_cb_bl = cb_blocking,
1159 .ei_cb_cp = ldlm_completion_ast,
1161 struct lustre_handle lockh;
1166 CDEBUG(D_DLMTRACE, "(name: %.*s,"DFID") in obj "DFID
1167 ", intent: %s flags %#Lo\n", op_data->op_namelen,
1168 op_data->op_name, PFID(&op_data->op_fid2),
1169 PFID(&op_data->op_fid1), ldlm_it2str(it->it_op),
1173 if (fid_is_sane(&op_data->op_fid2) &&
1174 (it->it_op & (IT_LOOKUP | IT_GETATTR | IT_READDIR))) {
1175 /* We could just return 1 immediately, but since we should only
1176 * be called in revalidate_it if we already have a lock, let's
1178 it->d.lustre.it_lock_handle = 0;
1179 rc = mdc_revalidate_lock(exp, it, &op_data->op_fid2, NULL);
1180 /* Only return failure if it was not GETATTR by cfid
1181 (from inode_revalidate) */
1182 if (rc || op_data->op_namelen != 0)
1186 /* For case if upper layer did not alloc fid, do it now. */
1187 if (!fid_is_sane(&op_data->op_fid2) && it->it_op & IT_CREAT) {
1188 rc = mdc_fid_alloc(exp, &op_data->op_fid2, op_data);
1190 CERROR("Can't alloc new fid, rc %d\n", rc);
1194 rc = mdc_enqueue(exp, &einfo, it, op_data, &lockh, lmm, lmmsize, NULL,
1199 *reqp = it->d.lustre.it_data;
1200 rc = mdc_finish_intent_lock(exp, *reqp, op_data, it, &lockh);
1204 static int mdc_intent_getattr_async_interpret(const struct lu_env *env,
1205 struct ptlrpc_request *req,
1208 struct mdc_getattr_args *ga = args;
1209 struct obd_export *exp = ga->ga_exp;
1210 struct md_enqueue_info *minfo = ga->ga_minfo;
1211 struct ldlm_enqueue_info *einfo = ga->ga_einfo;
1212 struct lookup_intent *it;
1213 struct lustre_handle *lockh;
1214 struct obd_device *obddev;
1215 struct ldlm_reply *lockrep;
1216 __u64 flags = LDLM_FL_HAS_INTENT;
1220 lockh = &minfo->mi_lockh;
1222 obddev = class_exp2obd(exp);
1224 mdc_exit_request(&obddev->u.cli);
1225 if (OBD_FAIL_CHECK(OBD_FAIL_MDC_GETATTR_ENQUEUE))
1228 rc = ldlm_cli_enqueue_fini(exp, req, einfo->ei_type, 1, einfo->ei_mode,
1229 &flags, NULL, 0, lockh, rc);
1231 CERROR("ldlm_cli_enqueue_fini: %d\n", rc);
1232 mdc_clear_replay_flag(req, rc);
1236 lockrep = req_capsule_server_get(&req->rq_pill, &RMF_DLM_REP);
1237 LASSERT(lockrep != NULL);
1239 lockrep->lock_policy_res2 =
1240 ptlrpc_status_ntoh(lockrep->lock_policy_res2);
1242 rc = mdc_finish_enqueue(exp, req, einfo, it, lockh, rc);
1246 rc = mdc_finish_intent_lock(exp, req, &minfo->mi_data, it, lockh);
1250 OBD_FREE_PTR(einfo);
1251 minfo->mi_cb(req, minfo, rc);
1255 int mdc_intent_getattr_async(struct obd_export *exp,
1256 struct md_enqueue_info *minfo,
1257 struct ldlm_enqueue_info *einfo)
1259 struct md_op_data *op_data = &minfo->mi_data;
1260 struct lookup_intent *it = &minfo->mi_it;
1261 struct ptlrpc_request *req;
1262 struct mdc_getattr_args *ga;
1263 struct obd_device *obddev = class_exp2obd(exp);
1264 struct ldlm_res_id res_id;
1265 /*XXX: Both MDS_INODELOCK_LOOKUP and MDS_INODELOCK_UPDATE are needed
1266 * for statahead currently. Consider CMD in future, such two bits
1267 * maybe managed by different MDS, should be adjusted then. */
1268 ldlm_policy_data_t policy = {
1269 .l_inodebits = { MDS_INODELOCK_LOOKUP |
1270 MDS_INODELOCK_UPDATE }
1273 __u64 flags = LDLM_FL_HAS_INTENT;
1276 CDEBUG(D_DLMTRACE,"name: %.*s in inode "DFID", intent: %s flags %#Lo\n",
1277 op_data->op_namelen, op_data->op_name, PFID(&op_data->op_fid1),
1278 ldlm_it2str(it->it_op), it->it_flags);
1280 fid_build_reg_res_name(&op_data->op_fid1, &res_id);
1281 req = mdc_intent_getattr_pack(exp, it, op_data);
1283 RETURN(PTR_ERR(req));
1285 rc = mdc_enter_request(&obddev->u.cli);
1287 ptlrpc_req_finished(req);
1291 rc = ldlm_cli_enqueue(exp, &req, einfo, &res_id, &policy, &flags, NULL,
1292 0, LVB_T_NONE, &minfo->mi_lockh, 1);
1294 mdc_exit_request(&obddev->u.cli);
1295 ptlrpc_req_finished(req);
1299 CLASSERT(sizeof(*ga) <= sizeof(req->rq_async_args));
1300 ga = ptlrpc_req_async_args(req);
1302 ga->ga_minfo = minfo;
1303 ga->ga_einfo = einfo;
1305 req->rq_interpret_reply = mdc_intent_getattr_async_interpret;
1306 ptlrpcd_add_req(req, PDL_POLICY_LOCAL, -1);