4 * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
6 * This program is free software; you can redistribute it and/or modify
7 * it under the terms of the GNU General Public License version 2 only,
8 * as published by the Free Software Foundation.
10 * This program is distributed in the hope that it will be useful, but
11 * WITHOUT ANY WARRANTY; without even the implied warranty of
12 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
13 * General Public License version 2 for more details (a copy is included
14 * in the LICENSE file that accompanied this code).
16 * You should have received a copy of the GNU General Public License
17 * version 2 along with this program; If not, see
18 * http://www.sun.com/software/products/lustre/docs/GPLv2.pdf
20 * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara,
21 * CA 95054 USA or visit www.sun.com if you need additional information or
27 * Copyright (c) 2003, 2010, Oracle and/or its affiliates. All rights reserved.
28 * Use is subject to license terms.
30 * Copyright (c) 2011, 2013, Intel Corporation.
33 * This file is part of Lustre, http://www.lustre.org/
34 * Lustre is a trademark of Sun Microsystems, Inc.
37 #define DEBUG_SUBSYSTEM S_MDC
40 # include <linux/module.h>
41 # include <linux/lustre_intent.h>
43 # include <liblustre.h>
47 #include <obd_class.h>
48 #include <lustre_dlm.h>
49 #include <lustre_fid.h> /* fid_res_name_eq() */
50 #include <lustre_mdc.h>
51 #include <lustre_net.h>
52 #include <lustre_req_layout.h>
53 #include "mdc_internal.h"
55 struct mdc_getattr_args {
56 struct obd_export *ga_exp;
57 struct md_enqueue_info *ga_minfo;
58 struct ldlm_enqueue_info *ga_einfo;
61 int it_open_error(int phase, struct lookup_intent *it)
63 if (it_disposition(it, DISP_OPEN_LEASE)) {
64 if (phase >= DISP_OPEN_LEASE)
65 return it->d.lustre.it_status;
69 if (it_disposition(it, DISP_OPEN_OPEN)) {
70 if (phase >= DISP_OPEN_OPEN)
71 return it->d.lustre.it_status;
76 if (it_disposition(it, DISP_OPEN_CREATE)) {
77 if (phase >= DISP_OPEN_CREATE)
78 return it->d.lustre.it_status;
83 if (it_disposition(it, DISP_LOOKUP_EXECD)) {
84 if (phase >= DISP_LOOKUP_EXECD)
85 return it->d.lustre.it_status;
90 if (it_disposition(it, DISP_IT_EXECD)) {
91 if (phase >= DISP_IT_EXECD)
92 return it->d.lustre.it_status;
96 CERROR("it disp: %X, status: %d\n", it->d.lustre.it_disposition,
97 it->d.lustre.it_status);
101 EXPORT_SYMBOL(it_open_error);
103 /* this must be called on a lockh that is known to have a referenced lock */
104 int mdc_set_lock_data(struct obd_export *exp, __u64 *lockh, void *data,
107 struct ldlm_lock *lock;
108 struct inode *new_inode = data;
117 lock = ldlm_handle2lock((struct lustre_handle *)lockh);
119 LASSERT(lock != NULL);
120 lock_res_and_lock(lock);
122 if (lock->l_resource->lr_lvb_inode &&
123 lock->l_resource->lr_lvb_inode != data) {
124 struct inode *old_inode = lock->l_resource->lr_lvb_inode;
125 LASSERTF(old_inode->i_state & I_FREEING,
126 "Found existing inode %p/%lu/%u state %lu in lock: "
127 "setting data to %p/%lu/%u\n", old_inode,
128 old_inode->i_ino, old_inode->i_generation,
130 new_inode, new_inode->i_ino, new_inode->i_generation);
133 lock->l_resource->lr_lvb_inode = new_inode;
135 *bits = lock->l_policy_data.l_inodebits.bits;
137 unlock_res_and_lock(lock);
143 ldlm_mode_t mdc_lock_match(struct obd_export *exp, __u64 flags,
144 const struct lu_fid *fid, ldlm_type_t type,
145 ldlm_policy_data_t *policy, ldlm_mode_t mode,
146 struct lustre_handle *lockh)
148 struct ldlm_res_id res_id;
152 fid_build_reg_res_name(fid, &res_id);
153 /* LU-4405: Clear bits not supported by server */
154 policy->l_inodebits.bits &= exp_connect_ibits(exp);
155 rc = ldlm_lock_match(class_exp2obd(exp)->obd_namespace, flags,
156 &res_id, type, policy, mode, lockh, 0);
160 int mdc_cancel_unused(struct obd_export *exp,
161 const struct lu_fid *fid,
162 ldlm_policy_data_t *policy,
164 ldlm_cancel_flags_t flags,
167 struct ldlm_res_id res_id;
168 struct obd_device *obd = class_exp2obd(exp);
173 fid_build_reg_res_name(fid, &res_id);
174 rc = ldlm_cli_cancel_unused_resource(obd->obd_namespace, &res_id,
175 policy, mode, flags, opaque);
179 int mdc_null_inode(struct obd_export *exp,
180 const struct lu_fid *fid)
182 struct ldlm_res_id res_id;
183 struct ldlm_resource *res;
184 struct ldlm_namespace *ns = class_exp2obd(exp)->obd_namespace;
187 LASSERTF(ns != NULL, "no namespace passed\n");
189 fid_build_reg_res_name(fid, &res_id);
191 res = ldlm_resource_get(ns, NULL, &res_id, 0, 0);
196 res->lr_lvb_inode = NULL;
199 ldlm_resource_putref(res);
203 /* find any ldlm lock of the inode in mdc
207 int mdc_find_cbdata(struct obd_export *exp,
208 const struct lu_fid *fid,
209 ldlm_iterator_t it, void *data)
211 struct ldlm_res_id res_id;
215 fid_build_reg_res_name((struct lu_fid*)fid, &res_id);
216 rc = ldlm_resource_iterate(class_exp2obd(exp)->obd_namespace, &res_id,
218 if (rc == LDLM_ITER_STOP)
220 else if (rc == LDLM_ITER_CONTINUE)
225 static inline void mdc_clear_replay_flag(struct ptlrpc_request *req, int rc)
227 /* Don't hold error requests for replay. */
228 if (req->rq_replay) {
229 spin_lock(&req->rq_lock);
231 spin_unlock(&req->rq_lock);
233 if (rc && req->rq_transno != 0) {
234 DEBUG_REQ(D_ERROR, req, "transno returned on error rc %d", rc);
239 /* Save a large LOV EA into the request buffer so that it is available
240 * for replay. We don't do this in the initial request because the
241 * original request doesn't need this buffer (at most it sends just the
242 * lov_mds_md) and it is a waste of RAM/bandwidth to send the empty
243 * buffer and may also be difficult to allocate and save a very large
244 * request buffer for each open. (bug 5707)
246 * OOM here may cause recovery failure if lmm is needed (only for the
247 * original open if the MDS crashed just when this client also OOM'd)
248 * but this is incredibly unlikely, and questionable whether the client
249 * could do MDS recovery under OOM anyways... */
250 static void mdc_realloc_openmsg(struct ptlrpc_request *req,
251 struct mdt_body *body)
255 /* FIXME: remove this explicit offset. */
256 rc = sptlrpc_cli_enlarge_reqbuf(req, DLM_INTENT_REC_OFF + 4,
259 CERROR("Can't enlarge segment %d size to %d\n",
260 DLM_INTENT_REC_OFF + 4, body->eadatasize);
261 body->valid &= ~OBD_MD_FLEASIZE;
262 body->eadatasize = 0;
266 static struct ptlrpc_request *mdc_intent_open_pack(struct obd_export *exp,
267 struct lookup_intent *it,
268 struct md_op_data *op_data,
269 void *lmm, int lmmsize,
272 struct ptlrpc_request *req;
273 struct obd_device *obddev = class_exp2obd(exp);
274 struct ldlm_intent *lit;
275 CFS_LIST_HEAD(cancels);
281 it->it_create_mode = (it->it_create_mode & ~S_IFMT) | S_IFREG;
283 /* XXX: openlock is not cancelled for cross-refs. */
284 /* If inode is known, cancel conflicting OPEN locks. */
285 if (fid_is_sane(&op_data->op_fid2)) {
286 if (it->it_flags & MDS_OPEN_LEASE) { /* try to get lease */
287 if (it->it_flags & FMODE_WRITE)
292 if (it->it_flags & (FMODE_WRITE|MDS_OPEN_TRUNC))
295 else if (it->it_flags & FMODE_EXEC)
301 count = mdc_resource_get_unused(exp, &op_data->op_fid2,
306 /* If CREATE, cancel parent's UPDATE lock. */
307 if (it->it_op & IT_CREAT)
311 count += mdc_resource_get_unused(exp, &op_data->op_fid1,
313 MDS_INODELOCK_UPDATE);
315 req = ptlrpc_request_alloc(class_exp2cliimp(exp),
316 &RQF_LDLM_INTENT_OPEN);
318 ldlm_lock_list_put(&cancels, l_bl_ast, count);
319 RETURN(ERR_PTR(-ENOMEM));
322 /* parent capability */
323 mdc_set_capa_size(req, &RMF_CAPA1, op_data->op_capa1);
324 /* child capability, reserve the size according to parent capa, it will
325 * be filled after we get the reply */
326 mdc_set_capa_size(req, &RMF_CAPA2, op_data->op_capa1);
328 req_capsule_set_size(&req->rq_pill, &RMF_NAME, RCL_CLIENT,
329 op_data->op_namelen + 1);
330 req_capsule_set_size(&req->rq_pill, &RMF_EADATA, RCL_CLIENT,
331 max(lmmsize, obddev->u.cli.cl_default_mds_easize));
333 rc = ldlm_prep_enqueue_req(exp, req, &cancels, count);
335 ptlrpc_request_free(req);
339 spin_lock(&req->rq_lock);
340 req->rq_replay = req->rq_import->imp_replayable;
341 spin_unlock(&req->rq_lock);
343 /* pack the intent */
344 lit = req_capsule_client_get(&req->rq_pill, &RMF_LDLM_INTENT);
345 lit->opc = (__u64)it->it_op;
347 /* pack the intended request */
348 mdc_open_pack(req, op_data, it->it_create_mode, 0, it->it_flags, lmm,
351 req_capsule_set_size(&req->rq_pill, &RMF_MDT_MD, RCL_SERVER,
352 obddev->u.cli.cl_max_mds_easize);
354 /* for remote client, fetch remote perm for current user */
355 if (client_is_remote(exp))
356 req_capsule_set_size(&req->rq_pill, &RMF_ACL, RCL_SERVER,
357 sizeof(struct mdt_remote_perm));
358 ptlrpc_request_set_replen(req);
362 static struct ptlrpc_request *
363 mdc_intent_getxattr_pack(struct obd_export *exp,
364 struct lookup_intent *it,
365 struct md_op_data *op_data)
367 struct ptlrpc_request *req;
368 struct ldlm_intent *lit;
369 int rc, count = 0, maxdata;
370 CFS_LIST_HEAD(cancels);
374 req = ptlrpc_request_alloc(class_exp2cliimp(exp),
375 &RQF_LDLM_INTENT_GETXATTR);
377 RETURN(ERR_PTR(-ENOMEM));
379 mdc_set_capa_size(req, &RMF_CAPA1, op_data->op_capa1);
381 rc = ldlm_prep_enqueue_req(exp, req, &cancels, count);
383 ptlrpc_request_free(req);
387 /* pack the intent */
388 lit = req_capsule_client_get(&req->rq_pill, &RMF_LDLM_INTENT);
389 lit->opc = IT_GETXATTR;
391 maxdata = class_exp2cliimp(exp)->imp_connect_data.ocd_max_easize;
393 /* pack the intended request */
394 mdc_pack_body(req, &op_data->op_fid1, op_data->op_capa1,
395 op_data->op_valid, maxdata, -1, 0);
397 req_capsule_set_size(&req->rq_pill, &RMF_EADATA,
398 RCL_SERVER, maxdata);
400 req_capsule_set_size(&req->rq_pill, &RMF_EAVALS,
401 RCL_SERVER, maxdata);
403 req_capsule_set_size(&req->rq_pill, &RMF_EAVALS_LENS,
404 RCL_SERVER, maxdata);
406 ptlrpc_request_set_replen(req);
411 static struct ptlrpc_request *mdc_intent_unlink_pack(struct obd_export *exp,
412 struct lookup_intent *it,
413 struct md_op_data *op_data)
415 struct ptlrpc_request *req;
416 struct obd_device *obddev = class_exp2obd(exp);
417 struct ldlm_intent *lit;
421 req = ptlrpc_request_alloc(class_exp2cliimp(exp),
422 &RQF_LDLM_INTENT_UNLINK);
424 RETURN(ERR_PTR(-ENOMEM));
426 mdc_set_capa_size(req, &RMF_CAPA1, op_data->op_capa1);
427 req_capsule_set_size(&req->rq_pill, &RMF_NAME, RCL_CLIENT,
428 op_data->op_namelen + 1);
430 rc = ldlm_prep_enqueue_req(exp, req, NULL, 0);
432 ptlrpc_request_free(req);
436 /* pack the intent */
437 lit = req_capsule_client_get(&req->rq_pill, &RMF_LDLM_INTENT);
438 lit->opc = (__u64)it->it_op;
440 /* pack the intended request */
441 mdc_unlink_pack(req, op_data);
443 req_capsule_set_size(&req->rq_pill, &RMF_MDT_MD, RCL_SERVER,
444 obddev->u.cli.cl_max_mds_easize);
445 req_capsule_set_size(&req->rq_pill, &RMF_ACL, RCL_SERVER,
446 obddev->u.cli.cl_max_mds_cookiesize);
447 ptlrpc_request_set_replen(req);
451 static struct ptlrpc_request *mdc_intent_getattr_pack(struct obd_export *exp,
452 struct lookup_intent *it,
453 struct md_op_data *op_data)
455 struct ptlrpc_request *req;
456 struct obd_device *obddev = class_exp2obd(exp);
457 obd_valid valid = OBD_MD_FLGETATTR | OBD_MD_FLEASIZE |
458 OBD_MD_FLMODEASIZE | OBD_MD_FLDIREA |
459 OBD_MD_FLMDSCAPA | OBD_MD_MEA |
460 (client_is_remote(exp) ?
461 OBD_MD_FLRMTPERM : OBD_MD_FLACL);
462 struct ldlm_intent *lit;
466 req = ptlrpc_request_alloc(class_exp2cliimp(exp),
467 &RQF_LDLM_INTENT_GETATTR);
469 RETURN(ERR_PTR(-ENOMEM));
471 mdc_set_capa_size(req, &RMF_CAPA1, op_data->op_capa1);
472 req_capsule_set_size(&req->rq_pill, &RMF_NAME, RCL_CLIENT,
473 op_data->op_namelen + 1);
475 rc = ldlm_prep_enqueue_req(exp, req, NULL, 0);
477 ptlrpc_request_free(req);
481 /* pack the intent */
482 lit = req_capsule_client_get(&req->rq_pill, &RMF_LDLM_INTENT);
483 lit->opc = (__u64)it->it_op;
485 /* pack the intended request */
486 mdc_getattr_pack(req, valid, it->it_flags, op_data,
487 obddev->u.cli.cl_max_mds_easize);
489 req_capsule_set_size(&req->rq_pill, &RMF_MDT_MD, RCL_SERVER,
490 obddev->u.cli.cl_max_mds_easize);
491 if (client_is_remote(exp))
492 req_capsule_set_size(&req->rq_pill, &RMF_ACL, RCL_SERVER,
493 sizeof(struct mdt_remote_perm));
494 ptlrpc_request_set_replen(req);
498 static struct ptlrpc_request *mdc_intent_layout_pack(struct obd_export *exp,
499 struct lookup_intent *it,
500 struct md_op_data *unused)
502 struct obd_device *obd = class_exp2obd(exp);
503 struct ptlrpc_request *req;
504 struct ldlm_intent *lit;
505 struct layout_intent *layout;
509 req = ptlrpc_request_alloc(class_exp2cliimp(exp),
510 &RQF_LDLM_INTENT_LAYOUT);
512 RETURN(ERR_PTR(-ENOMEM));
514 req_capsule_set_size(&req->rq_pill, &RMF_EADATA, RCL_CLIENT, 0);
515 rc = ldlm_prep_enqueue_req(exp, req, NULL, 0);
517 ptlrpc_request_free(req);
521 /* pack the intent */
522 lit = req_capsule_client_get(&req->rq_pill, &RMF_LDLM_INTENT);
523 lit->opc = (__u64)it->it_op;
525 /* pack the layout intent request */
526 layout = req_capsule_client_get(&req->rq_pill, &RMF_LAYOUT_INTENT);
527 /* LAYOUT_INTENT_ACCESS is generic, specific operation will be
528 * set for replication */
529 layout->li_opc = LAYOUT_INTENT_ACCESS;
531 req_capsule_set_size(&req->rq_pill, &RMF_DLM_LVB, RCL_SERVER,
532 obd->u.cli.cl_max_mds_easize);
533 ptlrpc_request_set_replen(req);
537 static struct ptlrpc_request *
538 mdc_enqueue_pack(struct obd_export *exp, int lvb_len)
540 struct ptlrpc_request *req;
544 req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_LDLM_ENQUEUE);
546 RETURN(ERR_PTR(-ENOMEM));
548 rc = ldlm_prep_enqueue_req(exp, req, NULL, 0);
550 ptlrpc_request_free(req);
554 req_capsule_set_size(&req->rq_pill, &RMF_DLM_LVB, RCL_SERVER, lvb_len);
555 ptlrpc_request_set_replen(req);
559 static int mdc_finish_enqueue(struct obd_export *exp,
560 struct ptlrpc_request *req,
561 struct ldlm_enqueue_info *einfo,
562 struct lookup_intent *it,
563 struct lustre_handle *lockh,
566 struct req_capsule *pill = &req->rq_pill;
567 struct ldlm_request *lockreq;
568 struct ldlm_reply *lockrep;
569 struct lustre_intent_data *intent = &it->d.lustre;
570 struct ldlm_lock *lock;
571 void *lvb_data = NULL;
576 /* Similarly, if we're going to replay this request, we don't want to
577 * actually get a lock, just perform the intent. */
578 if (req->rq_transno || req->rq_replay) {
579 lockreq = req_capsule_client_get(pill, &RMF_DLM_REQ);
580 lockreq->lock_flags |= ldlm_flags_to_wire(LDLM_FL_INTENT_ONLY);
583 if (rc == ELDLM_LOCK_ABORTED) {
585 memset(lockh, 0, sizeof(*lockh));
587 } else { /* rc = 0 */
588 lock = ldlm_handle2lock(lockh);
589 LASSERT(lock != NULL);
591 /* If the server gave us back a different lock mode, we should
592 * fix up our variables. */
593 if (lock->l_req_mode != einfo->ei_mode) {
594 ldlm_lock_addref(lockh, lock->l_req_mode);
595 ldlm_lock_decref(lockh, einfo->ei_mode);
596 einfo->ei_mode = lock->l_req_mode;
601 lockrep = req_capsule_server_get(pill, &RMF_DLM_REP);
602 LASSERT(lockrep != NULL); /* checked by ldlm_cli_enqueue() */
604 intent->it_disposition = (int)lockrep->lock_policy_res1;
605 intent->it_status = (int)lockrep->lock_policy_res2;
606 intent->it_lock_mode = einfo->ei_mode;
607 intent->it_lock_handle = lockh->cookie;
608 intent->it_data = req;
610 /* Technically speaking rq_transno must already be zero if
611 * it_status is in error, so the check is a bit redundant */
612 if ((!req->rq_transno || intent->it_status < 0) && req->rq_replay)
613 mdc_clear_replay_flag(req, intent->it_status);
615 /* If we're doing an IT_OPEN which did not result in an actual
616 * successful open, then we need to remove the bit which saves
617 * this request for unconditional replay.
619 * It's important that we do this first! Otherwise we might exit the
620 * function without doing so, and try to replay a failed create
622 if (it->it_op & IT_OPEN && req->rq_replay &&
623 (!it_disposition(it, DISP_OPEN_OPEN) ||intent->it_status != 0))
624 mdc_clear_replay_flag(req, intent->it_status);
626 DEBUG_REQ(D_RPCTRACE, req, "op: %d disposition: %x, status: %d",
627 it->it_op, intent->it_disposition, intent->it_status);
629 /* We know what to expect, so we do any byte flipping required here */
630 if (it->it_op & (IT_OPEN | IT_UNLINK | IT_LOOKUP | IT_GETATTR)) {
631 struct mdt_body *body;
633 body = req_capsule_server_get(pill, &RMF_MDT_BODY);
635 CERROR ("Can't swab mdt_body\n");
639 if (it_disposition(it, DISP_OPEN_OPEN) &&
640 !it_open_error(DISP_OPEN_OPEN, it)) {
642 * If this is a successful OPEN request, we need to set
643 * replay handler and data early, so that if replay
644 * happens immediately after swabbing below, new reply
645 * is swabbed by that handler correctly.
647 mdc_set_open_replay_data(NULL, NULL, it);
650 if ((body->valid & (OBD_MD_FLDIREA | OBD_MD_FLEASIZE)) != 0) {
653 mdc_update_max_ea_from_body(exp, body);
656 * The eadata is opaque; just check that it is there.
657 * Eventually, obd_unpackmd() will check the contents.
659 eadata = req_capsule_server_sized_get(pill, &RMF_MDT_MD,
664 /* save lvb data and length in case this is for layout
667 lvb_len = body->eadatasize;
670 * We save the reply LOV EA in case we have to replay a
671 * create for recovery. If we didn't allocate a large
672 * enough request buffer above we need to reallocate it
673 * here to hold the actual LOV EA.
675 * To not save LOV EA if request is not going to replay
676 * (for example error one).
678 if ((it->it_op & IT_OPEN) && req->rq_replay) {
680 if (req_capsule_get_size(pill, &RMF_EADATA,
683 mdc_realloc_openmsg(req, body);
685 req_capsule_shrink(pill, &RMF_EADATA,
689 req_capsule_set_size(pill, &RMF_EADATA,
693 lmm = req_capsule_client_get(pill, &RMF_EADATA);
695 memcpy(lmm, eadata, body->eadatasize);
699 if (body->valid & OBD_MD_FLRMTPERM) {
700 struct mdt_remote_perm *perm;
702 LASSERT(client_is_remote(exp));
703 perm = req_capsule_server_swab_get(pill, &RMF_ACL,
704 lustre_swab_mdt_remote_perm);
708 if (body->valid & OBD_MD_FLMDSCAPA) {
709 struct lustre_capa *capa, *p;
711 capa = req_capsule_server_get(pill, &RMF_CAPA1);
715 if (it->it_op & IT_OPEN) {
716 /* client fid capa will be checked in replay */
717 p = req_capsule_client_get(pill, &RMF_CAPA2);
722 if (body->valid & OBD_MD_FLOSSCAPA) {
723 struct lustre_capa *capa;
725 capa = req_capsule_server_get(pill, &RMF_CAPA2);
729 } else if (it->it_op & IT_LAYOUT) {
730 /* maybe the lock was granted right away and layout
731 * is packed into RMF_DLM_LVB of req */
732 lvb_len = req_capsule_get_size(pill, &RMF_DLM_LVB, RCL_SERVER);
734 lvb_data = req_capsule_server_sized_get(pill,
735 &RMF_DLM_LVB, lvb_len);
736 if (lvb_data == NULL)
741 /* fill in stripe data for layout lock */
742 lock = ldlm_handle2lock(lockh);
743 if (lock != NULL && ldlm_has_layout(lock) && lvb_data != NULL) {
746 LDLM_DEBUG(lock, "layout lock returned by: %s, lvb_len: %d\n",
747 ldlm_it2str(it->it_op), lvb_len);
749 OBD_ALLOC_LARGE(lmm, lvb_len);
754 memcpy(lmm, lvb_data, lvb_len);
756 /* install lvb_data */
757 lock_res_and_lock(lock);
758 if (lock->l_lvb_data == NULL) {
759 lock->l_lvb_type = LVB_T_LAYOUT;
760 lock->l_lvb_data = lmm;
761 lock->l_lvb_len = lvb_len;
764 unlock_res_and_lock(lock);
766 OBD_FREE_LARGE(lmm, lvb_len);
774 /* We always reserve enough space in the reply packet for a stripe MD, because
775 * we don't know in advance the file type. */
776 int mdc_enqueue(struct obd_export *exp, struct ldlm_enqueue_info *einfo,
777 struct lookup_intent *it, struct md_op_data *op_data,
778 struct lustre_handle *lockh, void *lmm, int lmmsize,
779 struct ptlrpc_request **reqp, __u64 extra_lock_flags)
781 struct obd_device *obddev = class_exp2obd(exp);
782 struct ptlrpc_request *req = NULL;
783 __u64 flags, saved_flags = extra_lock_flags;
785 struct ldlm_res_id res_id;
786 static const ldlm_policy_data_t lookup_policy =
787 { .l_inodebits = { MDS_INODELOCK_LOOKUP } };
788 static const ldlm_policy_data_t update_policy =
789 { .l_inodebits = { MDS_INODELOCK_UPDATE } };
790 static const ldlm_policy_data_t layout_policy =
791 { .l_inodebits = { MDS_INODELOCK_LAYOUT } };
792 static const ldlm_policy_data_t getxattr_policy = {
793 .l_inodebits = { MDS_INODELOCK_XATTR } };
794 ldlm_policy_data_t const *policy = &lookup_policy;
795 int generation, resends = 0;
796 struct ldlm_reply *lockrep;
797 enum lvb_type lvb_type = 0;
800 LASSERTF(!it || einfo->ei_type == LDLM_IBITS, "lock type %d\n",
803 fid_build_reg_res_name(&op_data->op_fid1, &res_id);
806 saved_flags |= LDLM_FL_HAS_INTENT;
807 if (it->it_op & (IT_UNLINK | IT_GETATTR | IT_READDIR))
808 policy = &update_policy;
809 else if (it->it_op & IT_LAYOUT)
810 policy = &layout_policy;
811 else if (it->it_op & (IT_GETXATTR | IT_SETXATTR))
812 policy = &getxattr_policy;
815 LASSERT(reqp == NULL);
817 generation = obddev->u.cli.cl_import->imp_generation;
821 /* The only way right now is FLOCK, in this case we hide flock
822 policy as lmm, but lmmsize is 0 */
823 LASSERT(lmm && lmmsize == 0);
824 LASSERTF(einfo->ei_type == LDLM_FLOCK, "lock type %d\n",
826 policy = (ldlm_policy_data_t *)lmm;
827 res_id.name[3] = LDLM_FLOCK;
828 } else if (it->it_op & IT_OPEN) {
829 req = mdc_intent_open_pack(exp, it, op_data, lmm, lmmsize,
831 policy = &update_policy;
832 einfo->ei_cbdata = NULL;
834 } else if (it->it_op & IT_UNLINK) {
835 req = mdc_intent_unlink_pack(exp, it, op_data);
836 } else if (it->it_op & (IT_GETATTR | IT_LOOKUP)) {
837 req = mdc_intent_getattr_pack(exp, it, op_data);
838 } else if (it->it_op & IT_READDIR) {
839 req = mdc_enqueue_pack(exp, 0);
840 } else if (it->it_op & IT_LAYOUT) {
841 if (!imp_connect_lvb_type(class_exp2cliimp(exp)))
843 req = mdc_intent_layout_pack(exp, it, op_data);
844 lvb_type = LVB_T_LAYOUT;
845 } else if (it->it_op & IT_GETXATTR) {
846 req = mdc_intent_getxattr_pack(exp, it, op_data);
853 RETURN(PTR_ERR(req));
855 if (req != NULL && it && it->it_op & IT_CREAT)
856 /* ask ptlrpc not to resend on EINPROGRESS since we have our own
858 req->rq_no_retry_einprogress = 1;
861 req->rq_generation_set = 1;
862 req->rq_import_generation = generation;
863 req->rq_sent = cfs_time_current_sec() + resends;
866 /* It is important to obtain rpc_lock first (if applicable), so that
867 * threads that are serialised with rpc_lock are not polluting our
868 * rpcs in flight counter. We do not do flock request limiting, though*/
870 mdc_get_rpc_lock(obddev->u.cli.cl_rpc_lock, it);
871 rc = mdc_enter_request(&obddev->u.cli);
873 mdc_put_rpc_lock(obddev->u.cli.cl_rpc_lock, it);
874 mdc_clear_replay_flag(req, 0);
875 ptlrpc_req_finished(req);
880 rc = ldlm_cli_enqueue(exp, &req, einfo, &res_id, policy, &flags, NULL,
881 0, lvb_type, lockh, 0);
883 /* For flock requests we immediatelly return without further
884 delay and let caller deal with the rest, since rest of
885 this function metadata processing makes no sense for flock
886 requests anyway. But in case of problem during comms with
887 Server (ETIMEDOUT) or any signal/kill attempt (EINTR), we
888 can not rely on caller and this mainly for F_UNLCKs
889 (explicits or automatically generated by Kernel to clean
890 current FLocks upon exit) that can't be trashed */
891 if (((rc == -EINTR) || (rc == -ETIMEDOUT)) &&
892 (einfo->ei_type == LDLM_FLOCK) &&
893 (einfo->ei_mode == LCK_NL))
898 mdc_exit_request(&obddev->u.cli);
899 mdc_put_rpc_lock(obddev->u.cli.cl_rpc_lock, it);
902 CDEBUG_LIMIT((rc == -EACCES || rc == -EIDRM) ? D_INFO : D_ERROR,
903 "%s: ldlm_cli_enqueue failed: rc = %d\n",
904 obddev->obd_name, rc);
906 mdc_clear_replay_flag(req, rc);
907 ptlrpc_req_finished(req);
911 lockrep = req_capsule_server_get(&req->rq_pill, &RMF_DLM_REP);
912 LASSERT(lockrep != NULL);
914 lockrep->lock_policy_res2 =
915 ptlrpc_status_ntoh(lockrep->lock_policy_res2);
917 /* Retry the create infinitely when we get -EINPROGRESS from
918 * server. This is required by the new quota design. */
919 if (it && it->it_op & IT_CREAT &&
920 (int)lockrep->lock_policy_res2 == -EINPROGRESS) {
921 mdc_clear_replay_flag(req, rc);
922 ptlrpc_req_finished(req);
925 CDEBUG(D_HA, "%s: resend:%d op:%d "DFID"/"DFID"\n",
926 obddev->obd_name, resends, it->it_op,
927 PFID(&op_data->op_fid1), PFID(&op_data->op_fid2));
929 if (generation == obddev->u.cli.cl_import->imp_generation) {
932 CDEBUG(D_HA, "resend cross eviction\n");
937 rc = mdc_finish_enqueue(exp, req, einfo, it, lockh, rc);
939 if (lustre_handle_is_used(lockh)) {
940 ldlm_lock_decref(lockh, einfo->ei_mode);
941 memset(lockh, 0, sizeof(*lockh));
943 ptlrpc_req_finished(req);
948 static int mdc_finish_intent_lock(struct obd_export *exp,
949 struct ptlrpc_request *request,
950 struct md_op_data *op_data,
951 struct lookup_intent *it,
952 struct lustre_handle *lockh)
954 struct lustre_handle old_lock;
955 struct mdt_body *mdt_body;
956 struct ldlm_lock *lock;
960 LASSERT(request != NULL);
961 LASSERT(request != LP_POISON);
962 LASSERT(request->rq_repmsg != LP_POISON);
964 if (it->it_op & IT_READDIR)
967 if (!it_disposition(it, DISP_IT_EXECD)) {
968 /* The server failed before it even started executing the
969 * intent, i.e. because it couldn't unpack the request. */
970 LASSERT(it->d.lustre.it_status != 0);
971 RETURN(it->d.lustre.it_status);
973 rc = it_open_error(DISP_IT_EXECD, it);
977 mdt_body = req_capsule_server_get(&request->rq_pill, &RMF_MDT_BODY);
978 LASSERT(mdt_body != NULL); /* mdc_enqueue checked */
980 /* If we were revalidating a fid/name pair, mark the intent in
981 * case we fail and get called again from lookup */
982 if (fid_is_sane(&op_data->op_fid2) &&
983 it->it_create_mode & M_CHECK_STALE &&
984 it->it_op != IT_GETATTR) {
985 /* Also: did we find the same inode? */
986 /* sever can return one of two fids:
987 * op_fid2 - new allocated fid - if file is created.
988 * op_fid3 - existent fid - if file only open.
989 * op_fid3 is saved in lmv_intent_open */
990 if ((!lu_fid_eq(&op_data->op_fid2, &mdt_body->fid1)) &&
991 (!lu_fid_eq(&op_data->op_fid3, &mdt_body->fid1))) {
992 CDEBUG(D_DENTRY, "Found stale data "DFID"("DFID")/"DFID
993 "\n", PFID(&op_data->op_fid2),
994 PFID(&op_data->op_fid2), PFID(&mdt_body->fid1));
999 rc = it_open_error(DISP_LOOKUP_EXECD, it);
1003 /* keep requests around for the multiple phases of the call
1004 * this shows the DISP_XX must guarantee we make it into the call
1006 if (!it_disposition(it, DISP_ENQ_CREATE_REF) &&
1007 it_disposition(it, DISP_OPEN_CREATE) &&
1008 !it_open_error(DISP_OPEN_CREATE, it)) {
1009 it_set_disposition(it, DISP_ENQ_CREATE_REF);
1010 ptlrpc_request_addref(request); /* balanced in ll_create_node */
1012 if (!it_disposition(it, DISP_ENQ_OPEN_REF) &&
1013 it_disposition(it, DISP_OPEN_OPEN) &&
1014 !it_open_error(DISP_OPEN_OPEN, it)) {
1015 it_set_disposition(it, DISP_ENQ_OPEN_REF);
1016 ptlrpc_request_addref(request); /* balanced in ll_file_open */
1017 /* BUG 11546 - eviction in the middle of open rpc processing */
1018 OBD_FAIL_TIMEOUT(OBD_FAIL_MDC_ENQUEUE_PAUSE, obd_timeout);
1021 if (it->it_op & IT_CREAT) {
1022 /* XXX this belongs in ll_create_it */
1023 } else if (it->it_op == IT_OPEN) {
1024 LASSERT(!it_disposition(it, DISP_OPEN_CREATE));
1026 LASSERT(it->it_op & (IT_GETATTR | IT_LOOKUP | IT_LAYOUT));
1029 /* If we already have a matching lock, then cancel the new
1030 * one. We have to set the data here instead of in
1031 * mdc_enqueue, because we need to use the child's inode as
1032 * the l_ast_data to match, and that's not available until
1033 * intent_finish has performed the iget().) */
1034 lock = ldlm_handle2lock(lockh);
1036 ldlm_policy_data_t policy = lock->l_policy_data;
1037 LDLM_DEBUG(lock, "matching against this");
1039 LASSERTF(fid_res_name_eq(&mdt_body->fid1,
1040 &lock->l_resource->lr_name),
1041 "Lock res_id: "DLDLMRES", fid: "DFID"\n",
1042 PLDLMRES(lock->l_resource), PFID(&mdt_body->fid1));
1043 LDLM_LOCK_PUT(lock);
1045 memcpy(&old_lock, lockh, sizeof(*lockh));
1046 if (ldlm_lock_match(NULL, LDLM_FL_BLOCK_GRANTED, NULL,
1047 LDLM_IBITS, &policy, LCK_NL, &old_lock, 0)) {
1048 ldlm_lock_decref_and_cancel(lockh,
1049 it->d.lustre.it_lock_mode);
1050 memcpy(lockh, &old_lock, sizeof(old_lock));
1051 it->d.lustre.it_lock_handle = lockh->cookie;
1054 CDEBUG(D_DENTRY,"D_IT dentry %.*s intent: %s status %d disp %x rc %d\n",
1055 op_data->op_namelen, op_data->op_name, ldlm_it2str(it->it_op),
1056 it->d.lustre.it_status, it->d.lustre.it_disposition, rc);
1060 int mdc_revalidate_lock(struct obd_export *exp, struct lookup_intent *it,
1061 struct lu_fid *fid, __u64 *bits)
1063 /* We could just return 1 immediately, but since we should only
1064 * be called in revalidate_it if we already have a lock, let's
1066 struct ldlm_res_id res_id;
1067 struct lustre_handle lockh;
1068 ldlm_policy_data_t policy;
1072 if (it->d.lustre.it_lock_handle) {
1073 lockh.cookie = it->d.lustre.it_lock_handle;
1074 mode = ldlm_revalidate_lock_handle(&lockh, bits);
1076 fid_build_reg_res_name(fid, &res_id);
1077 switch (it->it_op) {
1079 /* File attributes are held under multiple bits:
1080 * nlink is under lookup lock, size and times are
1081 * under UPDATE lock and recently we've also got
1082 * a separate permissions lock for owner/group/acl that
1083 * were protected by lookup lock before.
1084 * Getattr must provide all of that information,
1085 * so we need to ensure we have all of those locks.
1086 * Unfortunately, if the bits are split across multiple
1087 * locks, there's no easy way to match all of them here,
1088 * so an extra RPC would be performed to fetch all
1089 * of those bits at once for now. */
1090 /* For new MDTs(> 2.4), UPDATE|PERM should be enough,
1091 * but for old MDTs (< 2.4), permission is covered
1092 * by LOOKUP lock, so it needs to match all bits here.*/
1093 policy.l_inodebits.bits = MDS_INODELOCK_UPDATE |
1094 MDS_INODELOCK_LOOKUP |
1098 policy.l_inodebits.bits = MDS_INODELOCK_UPDATE;
1101 policy.l_inodebits.bits = MDS_INODELOCK_LAYOUT;
1104 policy.l_inodebits.bits = MDS_INODELOCK_LOOKUP;
1108 mode = mdc_lock_match(exp, LDLM_FL_BLOCK_GRANTED, fid,
1109 LDLM_IBITS, &policy,
1110 LCK_CR | LCK_CW | LCK_PR | LCK_PW,
1115 it->d.lustre.it_lock_handle = lockh.cookie;
1116 it->d.lustre.it_lock_mode = mode;
1118 it->d.lustre.it_lock_handle = 0;
1119 it->d.lustre.it_lock_mode = 0;
1126 * This long block is all about fixing up the lock and request state
1127 * so that it is correct as of the moment _before_ the operation was
1128 * applied; that way, the VFS will think that everything is normal and
1129 * call Lustre's regular VFS methods.
1131 * If we're performing a creation, that means that unless the creation
1132 * failed with EEXIST, we should fake up a negative dentry.
1134 * For everything else, we want to lookup to succeed.
1136 * One additional note: if CREATE or OPEN succeeded, we add an extra
1137 * reference to the request because we need to keep it around until
1138 * ll_create/ll_open gets called.
1140 * The server will return to us, in it_disposition, an indication of
1141 * exactly what d.lustre.it_status refers to.
1143 * If DISP_OPEN_OPEN is set, then d.lustre.it_status refers to the open() call,
1144 * otherwise if DISP_OPEN_CREATE is set, then it status is the
1145 * creation failure mode. In either case, one of DISP_LOOKUP_NEG or
1146 * DISP_LOOKUP_POS will be set, indicating whether the child lookup
1149 * Else, if DISP_LOOKUP_EXECD then d.lustre.it_status is the rc of the
1152 int mdc_intent_lock(struct obd_export *exp, struct md_op_data *op_data,
1153 void *lmm, int lmmsize, struct lookup_intent *it,
1154 int lookup_flags, struct ptlrpc_request **reqp,
1155 ldlm_blocking_callback cb_blocking,
1156 __u64 extra_lock_flags)
1158 struct ldlm_enqueue_info einfo = {
1159 .ei_type = LDLM_IBITS,
1160 .ei_mode = it_to_lock_mode(it),
1161 .ei_cb_bl = cb_blocking,
1162 .ei_cb_cp = ldlm_completion_ast,
1164 struct lustre_handle lockh;
1169 CDEBUG(D_DLMTRACE, "(name: %.*s,"DFID") in obj "DFID
1170 ", intent: %s flags %#Lo\n", op_data->op_namelen,
1171 op_data->op_name, PFID(&op_data->op_fid2),
1172 PFID(&op_data->op_fid1), ldlm_it2str(it->it_op),
1176 if (fid_is_sane(&op_data->op_fid2) &&
1177 (it->it_op & (IT_LOOKUP | IT_GETATTR | IT_READDIR))) {
1178 /* We could just return 1 immediately, but since we should only
1179 * be called in revalidate_it if we already have a lock, let's
1181 it->d.lustre.it_lock_handle = 0;
1182 rc = mdc_revalidate_lock(exp, it, &op_data->op_fid2, NULL);
1183 /* Only return failure if it was not GETATTR by cfid
1184 (from inode_revalidate) */
1185 if (rc || op_data->op_namelen != 0)
1189 /* For case if upper layer did not alloc fid, do it now. */
1190 if (!fid_is_sane(&op_data->op_fid2) && it->it_op & IT_CREAT) {
1191 rc = mdc_fid_alloc(exp, &op_data->op_fid2, op_data);
1193 CERROR("Can't alloc new fid, rc %d\n", rc);
1197 rc = mdc_enqueue(exp, &einfo, it, op_data, &lockh, lmm, lmmsize, NULL,
1202 *reqp = it->d.lustre.it_data;
1203 rc = mdc_finish_intent_lock(exp, *reqp, op_data, it, &lockh);
1207 static int mdc_intent_getattr_async_interpret(const struct lu_env *env,
1208 struct ptlrpc_request *req,
1211 struct mdc_getattr_args *ga = args;
1212 struct obd_export *exp = ga->ga_exp;
1213 struct md_enqueue_info *minfo = ga->ga_minfo;
1214 struct ldlm_enqueue_info *einfo = ga->ga_einfo;
1215 struct lookup_intent *it;
1216 struct lustre_handle *lockh;
1217 struct obd_device *obddev;
1218 struct ldlm_reply *lockrep;
1219 __u64 flags = LDLM_FL_HAS_INTENT;
1223 lockh = &minfo->mi_lockh;
1225 obddev = class_exp2obd(exp);
1227 mdc_exit_request(&obddev->u.cli);
1228 if (OBD_FAIL_CHECK(OBD_FAIL_MDC_GETATTR_ENQUEUE))
1231 rc = ldlm_cli_enqueue_fini(exp, req, einfo->ei_type, 1, einfo->ei_mode,
1232 &flags, NULL, 0, lockh, rc);
1234 CERROR("ldlm_cli_enqueue_fini: %d\n", rc);
1235 mdc_clear_replay_flag(req, rc);
1239 lockrep = req_capsule_server_get(&req->rq_pill, &RMF_DLM_REP);
1240 LASSERT(lockrep != NULL);
1242 lockrep->lock_policy_res2 =
1243 ptlrpc_status_ntoh(lockrep->lock_policy_res2);
1245 rc = mdc_finish_enqueue(exp, req, einfo, it, lockh, rc);
1249 rc = mdc_finish_intent_lock(exp, req, &minfo->mi_data, it, lockh);
1253 OBD_FREE_PTR(einfo);
1254 minfo->mi_cb(req, minfo, rc);
1258 int mdc_intent_getattr_async(struct obd_export *exp,
1259 struct md_enqueue_info *minfo,
1260 struct ldlm_enqueue_info *einfo)
1262 struct md_op_data *op_data = &minfo->mi_data;
1263 struct lookup_intent *it = &minfo->mi_it;
1264 struct ptlrpc_request *req;
1265 struct mdc_getattr_args *ga;
1266 struct obd_device *obddev = class_exp2obd(exp);
1267 struct ldlm_res_id res_id;
1268 /*XXX: Both MDS_INODELOCK_LOOKUP and MDS_INODELOCK_UPDATE are needed
1269 * for statahead currently. Consider CMD in future, such two bits
1270 * maybe managed by different MDS, should be adjusted then. */
1271 ldlm_policy_data_t policy = {
1272 .l_inodebits = { MDS_INODELOCK_LOOKUP |
1273 MDS_INODELOCK_UPDATE }
1276 __u64 flags = LDLM_FL_HAS_INTENT;
1279 CDEBUG(D_DLMTRACE,"name: %.*s in inode "DFID", intent: %s flags %#Lo\n",
1280 op_data->op_namelen, op_data->op_name, PFID(&op_data->op_fid1),
1281 ldlm_it2str(it->it_op), it->it_flags);
1283 fid_build_reg_res_name(&op_data->op_fid1, &res_id);
1284 req = mdc_intent_getattr_pack(exp, it, op_data);
1286 RETURN(PTR_ERR(req));
1288 rc = mdc_enter_request(&obddev->u.cli);
1290 ptlrpc_req_finished(req);
1294 rc = ldlm_cli_enqueue(exp, &req, einfo, &res_id, &policy, &flags, NULL,
1295 0, LVB_T_NONE, &minfo->mi_lockh, 1);
1297 mdc_exit_request(&obddev->u.cli);
1298 ptlrpc_req_finished(req);
1302 CLASSERT(sizeof(*ga) <= sizeof(req->rq_async_args));
1303 ga = ptlrpc_req_async_args(req);
1305 ga->ga_minfo = minfo;
1306 ga->ga_einfo = einfo;
1308 req->rq_interpret_reply = mdc_intent_getattr_async_interpret;
1309 ptlrpcd_add_req(req, PDL_POLICY_LOCAL, -1);