4 * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
6 * This program is free software; you can redistribute it and/or modify
7 * it under the terms of the GNU General Public License version 2 only,
8 * as published by the Free Software Foundation.
10 * This program is distributed in the hope that it will be useful, but
11 * WITHOUT ANY WARRANTY; without even the implied warranty of
12 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
13 * General Public License version 2 for more details (a copy is included
14 * in the LICENSE file that accompanied this code).
16 * You should have received a copy of the GNU General Public License
17 * version 2 along with this program; If not, see
18 * http://www.sun.com/software/products/lustre/docs/GPLv2.pdf
20 * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara,
21 * CA 95054 USA or visit www.sun.com if you need additional information or
27 * Copyright (c) 2003, 2010, Oracle and/or its affiliates. All rights reserved.
28 * Use is subject to license terms.
30 * Copyright (c) 2011, 2013, Intel Corporation.
33 * This file is part of Lustre, http://www.lustre.org/
34 * Lustre is a trademark of Sun Microsystems, Inc.
37 #define DEBUG_SUBSYSTEM S_MDC
40 # include <linux/module.h>
41 # include <linux/lustre_intent.h>
43 # include <liblustre.h>
47 #include <obd_class.h>
48 #include <lustre_dlm.h>
49 #include <lustre_fid.h> /* fid_res_name_eq() */
50 #include <lustre_mdc.h>
51 #include <lustre_net.h>
52 #include <lustre_req_layout.h>
53 #include "mdc_internal.h"
55 struct mdc_getattr_args {
56 struct obd_export *ga_exp;
57 struct md_enqueue_info *ga_minfo;
58 struct ldlm_enqueue_info *ga_einfo;
61 int it_disposition(struct lookup_intent *it, int flag)
63 return it->d.lustre.it_disposition & flag;
65 EXPORT_SYMBOL(it_disposition);
67 void it_set_disposition(struct lookup_intent *it, int flag)
69 it->d.lustre.it_disposition |= flag;
71 EXPORT_SYMBOL(it_set_disposition);
73 void it_clear_disposition(struct lookup_intent *it, int flag)
75 it->d.lustre.it_disposition &= ~flag;
77 EXPORT_SYMBOL(it_clear_disposition);
79 int it_open_error(int phase, struct lookup_intent *it)
81 if (it_disposition(it, DISP_OPEN_LEASE)) {
82 if (phase >= DISP_OPEN_LEASE)
83 return it->d.lustre.it_status;
87 if (it_disposition(it, DISP_OPEN_OPEN)) {
88 if (phase >= DISP_OPEN_OPEN)
89 return it->d.lustre.it_status;
94 if (it_disposition(it, DISP_OPEN_CREATE)) {
95 if (phase >= DISP_OPEN_CREATE)
96 return it->d.lustre.it_status;
101 if (it_disposition(it, DISP_LOOKUP_EXECD)) {
102 if (phase >= DISP_LOOKUP_EXECD)
103 return it->d.lustre.it_status;
108 if (it_disposition(it, DISP_IT_EXECD)) {
109 if (phase >= DISP_IT_EXECD)
110 return it->d.lustre.it_status;
114 CERROR("it disp: %X, status: %d\n", it->d.lustre.it_disposition,
115 it->d.lustre.it_status);
119 EXPORT_SYMBOL(it_open_error);
121 /* this must be called on a lockh that is known to have a referenced lock */
122 int mdc_set_lock_data(struct obd_export *exp, __u64 *lockh, void *data,
125 struct ldlm_lock *lock;
126 struct inode *new_inode = data;
135 lock = ldlm_handle2lock((struct lustre_handle *)lockh);
137 LASSERT(lock != NULL);
138 lock_res_and_lock(lock);
140 if (lock->l_resource->lr_lvb_inode &&
141 lock->l_resource->lr_lvb_inode != data) {
142 struct inode *old_inode = lock->l_resource->lr_lvb_inode;
143 LASSERTF(old_inode->i_state & I_FREEING,
144 "Found existing inode %p/%lu/%u state %lu in lock: "
145 "setting data to %p/%lu/%u\n", old_inode,
146 old_inode->i_ino, old_inode->i_generation,
148 new_inode, new_inode->i_ino, new_inode->i_generation);
151 lock->l_resource->lr_lvb_inode = new_inode;
153 *bits = lock->l_policy_data.l_inodebits.bits;
155 unlock_res_and_lock(lock);
161 ldlm_mode_t mdc_lock_match(struct obd_export *exp, __u64 flags,
162 const struct lu_fid *fid, ldlm_type_t type,
163 ldlm_policy_data_t *policy, ldlm_mode_t mode,
164 struct lustre_handle *lockh)
166 struct ldlm_res_id res_id;
170 fid_build_reg_res_name(fid, &res_id);
171 rc = ldlm_lock_match(class_exp2obd(exp)->obd_namespace, flags,
172 &res_id, type, policy, mode, lockh, 0);
176 int mdc_cancel_unused(struct obd_export *exp,
177 const struct lu_fid *fid,
178 ldlm_policy_data_t *policy,
180 ldlm_cancel_flags_t flags,
183 struct ldlm_res_id res_id;
184 struct obd_device *obd = class_exp2obd(exp);
189 fid_build_reg_res_name(fid, &res_id);
190 rc = ldlm_cli_cancel_unused_resource(obd->obd_namespace, &res_id,
191 policy, mode, flags, opaque);
195 int mdc_null_inode(struct obd_export *exp,
196 const struct lu_fid *fid)
198 struct ldlm_res_id res_id;
199 struct ldlm_resource *res;
200 struct ldlm_namespace *ns = class_exp2obd(exp)->obd_namespace;
203 LASSERTF(ns != NULL, "no namespace passed\n");
205 fid_build_reg_res_name(fid, &res_id);
207 res = ldlm_resource_get(ns, NULL, &res_id, 0, 0);
212 res->lr_lvb_inode = NULL;
215 ldlm_resource_putref(res);
219 /* find any ldlm lock of the inode in mdc
223 int mdc_find_cbdata(struct obd_export *exp,
224 const struct lu_fid *fid,
225 ldlm_iterator_t it, void *data)
227 struct ldlm_res_id res_id;
231 fid_build_reg_res_name((struct lu_fid*)fid, &res_id);
232 rc = ldlm_resource_iterate(class_exp2obd(exp)->obd_namespace, &res_id,
234 if (rc == LDLM_ITER_STOP)
236 else if (rc == LDLM_ITER_CONTINUE)
241 static inline void mdc_clear_replay_flag(struct ptlrpc_request *req, int rc)
243 /* Don't hold error requests for replay. */
244 if (req->rq_replay) {
245 spin_lock(&req->rq_lock);
247 spin_unlock(&req->rq_lock);
249 if (rc && req->rq_transno != 0) {
250 DEBUG_REQ(D_ERROR, req, "transno returned on error rc %d", rc);
255 /* Save a large LOV EA into the request buffer so that it is available
256 * for replay. We don't do this in the initial request because the
257 * original request doesn't need this buffer (at most it sends just the
258 * lov_mds_md) and it is a waste of RAM/bandwidth to send the empty
259 * buffer and may also be difficult to allocate and save a very large
260 * request buffer for each open. (bug 5707)
262 * OOM here may cause recovery failure if lmm is needed (only for the
263 * original open if the MDS crashed just when this client also OOM'd)
264 * but this is incredibly unlikely, and questionable whether the client
265 * could do MDS recovery under OOM anyways... */
266 static void mdc_realloc_openmsg(struct ptlrpc_request *req,
267 struct mdt_body *body)
271 /* FIXME: remove this explicit offset. */
272 rc = sptlrpc_cli_enlarge_reqbuf(req, DLM_INTENT_REC_OFF + 4,
275 CERROR("Can't enlarge segment %d size to %d\n",
276 DLM_INTENT_REC_OFF + 4, body->eadatasize);
277 body->valid &= ~OBD_MD_FLEASIZE;
278 body->eadatasize = 0;
282 static struct ptlrpc_request *mdc_intent_open_pack(struct obd_export *exp,
283 struct lookup_intent *it,
284 struct md_op_data *op_data,
285 void *lmm, int lmmsize,
288 struct ptlrpc_request *req;
289 struct obd_device *obddev = class_exp2obd(exp);
290 struct ldlm_intent *lit;
291 CFS_LIST_HEAD(cancels);
297 it->it_create_mode = (it->it_create_mode & ~S_IFMT) | S_IFREG;
299 /* XXX: openlock is not cancelled for cross-refs. */
300 /* If inode is known, cancel conflicting OPEN locks. */
301 if (fid_is_sane(&op_data->op_fid2)) {
302 if (it->it_flags & MDS_OPEN_LEASE) { /* try to get lease */
303 if (it->it_flags & FMODE_WRITE)
308 if (it->it_flags & (FMODE_WRITE|MDS_OPEN_TRUNC))
311 else if (it->it_flags & FMODE_EXEC)
317 count = mdc_resource_get_unused(exp, &op_data->op_fid2,
322 /* If CREATE, cancel parent's UPDATE lock. */
323 if (it->it_op & IT_CREAT)
327 count += mdc_resource_get_unused(exp, &op_data->op_fid1,
329 MDS_INODELOCK_UPDATE);
331 req = ptlrpc_request_alloc(class_exp2cliimp(exp),
332 &RQF_LDLM_INTENT_OPEN);
334 ldlm_lock_list_put(&cancels, l_bl_ast, count);
335 RETURN(ERR_PTR(-ENOMEM));
338 /* parent capability */
339 mdc_set_capa_size(req, &RMF_CAPA1, op_data->op_capa1);
340 /* child capability, reserve the size according to parent capa, it will
341 * be filled after we get the reply */
342 mdc_set_capa_size(req, &RMF_CAPA2, op_data->op_capa1);
344 req_capsule_set_size(&req->rq_pill, &RMF_NAME, RCL_CLIENT,
345 op_data->op_namelen + 1);
346 req_capsule_set_size(&req->rq_pill, &RMF_EADATA, RCL_CLIENT,
347 max(lmmsize, obddev->u.cli.cl_default_mds_easize));
349 rc = ldlm_prep_enqueue_req(exp, req, &cancels, count);
351 ptlrpc_request_free(req);
355 spin_lock(&req->rq_lock);
356 req->rq_replay = req->rq_import->imp_replayable;
357 spin_unlock(&req->rq_lock);
359 /* pack the intent */
360 lit = req_capsule_client_get(&req->rq_pill, &RMF_LDLM_INTENT);
361 lit->opc = (__u64)it->it_op;
363 /* pack the intended request */
364 mdc_open_pack(req, op_data, it->it_create_mode, 0, it->it_flags, lmm,
367 /* for remote client, fetch remote perm for current user */
368 if (client_is_remote(exp))
369 req_capsule_set_size(&req->rq_pill, &RMF_ACL, RCL_SERVER,
370 sizeof(struct mdt_remote_perm));
371 ptlrpc_request_set_replen(req);
375 static struct ptlrpc_request *
376 mdc_intent_getxattr_pack(struct obd_export *exp,
377 struct lookup_intent *it,
378 struct md_op_data *op_data)
380 struct ptlrpc_request *req;
381 struct ldlm_intent *lit;
382 int rc, count = 0, maxdata;
383 CFS_LIST_HEAD(cancels);
387 req = ptlrpc_request_alloc(class_exp2cliimp(exp),
388 &RQF_LDLM_INTENT_GETXATTR);
390 RETURN(ERR_PTR(-ENOMEM));
392 mdc_set_capa_size(req, &RMF_CAPA1, op_data->op_capa1);
394 rc = ldlm_prep_enqueue_req(exp, req, &cancels, count);
396 ptlrpc_request_free(req);
400 /* pack the intent */
401 lit = req_capsule_client_get(&req->rq_pill, &RMF_LDLM_INTENT);
402 lit->opc = IT_GETXATTR;
404 maxdata = class_exp2cliimp(exp)->imp_connect_data.ocd_max_easize;
406 /* pack the intended request */
407 mdc_pack_body(req, &op_data->op_fid1, op_data->op_capa1,
408 op_data->op_valid, maxdata, -1, 0);
410 req_capsule_set_size(&req->rq_pill, &RMF_EADATA,
411 RCL_SERVER, maxdata);
413 req_capsule_set_size(&req->rq_pill, &RMF_EAVALS,
414 RCL_SERVER, maxdata);
416 req_capsule_set_size(&req->rq_pill, &RMF_EAVALS_LENS,
417 RCL_SERVER, maxdata);
419 ptlrpc_request_set_replen(req);
424 static struct ptlrpc_request *mdc_intent_unlink_pack(struct obd_export *exp,
425 struct lookup_intent *it,
426 struct md_op_data *op_data)
428 struct ptlrpc_request *req;
429 struct obd_device *obddev = class_exp2obd(exp);
430 struct ldlm_intent *lit;
434 req = ptlrpc_request_alloc(class_exp2cliimp(exp),
435 &RQF_LDLM_INTENT_UNLINK);
437 RETURN(ERR_PTR(-ENOMEM));
439 mdc_set_capa_size(req, &RMF_CAPA1, op_data->op_capa1);
440 req_capsule_set_size(&req->rq_pill, &RMF_NAME, RCL_CLIENT,
441 op_data->op_namelen + 1);
443 rc = ldlm_prep_enqueue_req(exp, req, NULL, 0);
445 ptlrpc_request_free(req);
449 /* pack the intent */
450 lit = req_capsule_client_get(&req->rq_pill, &RMF_LDLM_INTENT);
451 lit->opc = (__u64)it->it_op;
453 /* pack the intended request */
454 mdc_unlink_pack(req, op_data);
456 req_capsule_set_size(&req->rq_pill, &RMF_MDT_MD, RCL_SERVER,
457 obddev->u.cli.cl_max_mds_easize);
458 req_capsule_set_size(&req->rq_pill, &RMF_ACL, RCL_SERVER,
459 obddev->u.cli.cl_max_mds_cookiesize);
460 ptlrpc_request_set_replen(req);
464 static struct ptlrpc_request *mdc_intent_getattr_pack(struct obd_export *exp,
465 struct lookup_intent *it,
466 struct md_op_data *op_data)
468 struct ptlrpc_request *req;
469 struct obd_device *obddev = class_exp2obd(exp);
470 obd_valid valid = OBD_MD_FLGETATTR | OBD_MD_FLEASIZE |
471 OBD_MD_FLMODEASIZE | OBD_MD_FLDIREA |
472 OBD_MD_FLMDSCAPA | OBD_MD_MEA |
473 (client_is_remote(exp) ?
474 OBD_MD_FLRMTPERM : OBD_MD_FLACL);
475 struct ldlm_intent *lit;
479 req = ptlrpc_request_alloc(class_exp2cliimp(exp),
480 &RQF_LDLM_INTENT_GETATTR);
482 RETURN(ERR_PTR(-ENOMEM));
484 mdc_set_capa_size(req, &RMF_CAPA1, op_data->op_capa1);
485 req_capsule_set_size(&req->rq_pill, &RMF_NAME, RCL_CLIENT,
486 op_data->op_namelen + 1);
488 rc = ldlm_prep_enqueue_req(exp, req, NULL, 0);
490 ptlrpc_request_free(req);
494 /* pack the intent */
495 lit = req_capsule_client_get(&req->rq_pill, &RMF_LDLM_INTENT);
496 lit->opc = (__u64)it->it_op;
498 /* pack the intended request */
499 mdc_getattr_pack(req, valid, it->it_flags, op_data,
500 obddev->u.cli.cl_max_mds_easize);
502 req_capsule_set_size(&req->rq_pill, &RMF_MDT_MD, RCL_SERVER,
503 obddev->u.cli.cl_max_mds_easize);
504 if (client_is_remote(exp))
505 req_capsule_set_size(&req->rq_pill, &RMF_ACL, RCL_SERVER,
506 sizeof(struct mdt_remote_perm));
507 ptlrpc_request_set_replen(req);
511 static struct ptlrpc_request *mdc_intent_layout_pack(struct obd_export *exp,
512 struct lookup_intent *it,
513 struct md_op_data *unused)
515 struct obd_device *obd = class_exp2obd(exp);
516 struct ptlrpc_request *req;
517 struct ldlm_intent *lit;
518 struct layout_intent *layout;
522 req = ptlrpc_request_alloc(class_exp2cliimp(exp),
523 &RQF_LDLM_INTENT_LAYOUT);
525 RETURN(ERR_PTR(-ENOMEM));
527 req_capsule_set_size(&req->rq_pill, &RMF_EADATA, RCL_CLIENT, 0);
528 rc = ldlm_prep_enqueue_req(exp, req, NULL, 0);
530 ptlrpc_request_free(req);
534 /* pack the intent */
535 lit = req_capsule_client_get(&req->rq_pill, &RMF_LDLM_INTENT);
536 lit->opc = (__u64)it->it_op;
538 /* pack the layout intent request */
539 layout = req_capsule_client_get(&req->rq_pill, &RMF_LAYOUT_INTENT);
540 /* LAYOUT_INTENT_ACCESS is generic, specific operation will be
541 * set for replication */
542 layout->li_opc = LAYOUT_INTENT_ACCESS;
544 req_capsule_set_size(&req->rq_pill, &RMF_DLM_LVB, RCL_SERVER,
545 obd->u.cli.cl_max_mds_easize);
546 ptlrpc_request_set_replen(req);
550 static struct ptlrpc_request *
551 mdc_enqueue_pack(struct obd_export *exp, int lvb_len)
553 struct ptlrpc_request *req;
557 req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_LDLM_ENQUEUE);
559 RETURN(ERR_PTR(-ENOMEM));
561 rc = ldlm_prep_enqueue_req(exp, req, NULL, 0);
563 ptlrpc_request_free(req);
567 req_capsule_set_size(&req->rq_pill, &RMF_DLM_LVB, RCL_SERVER, lvb_len);
568 ptlrpc_request_set_replen(req);
572 static int mdc_finish_enqueue(struct obd_export *exp,
573 struct ptlrpc_request *req,
574 struct ldlm_enqueue_info *einfo,
575 struct lookup_intent *it,
576 struct lustre_handle *lockh,
579 struct req_capsule *pill = &req->rq_pill;
580 struct ldlm_request *lockreq;
581 struct ldlm_reply *lockrep;
582 struct lustre_intent_data *intent = &it->d.lustre;
583 struct ldlm_lock *lock;
584 void *lvb_data = NULL;
589 /* Similarly, if we're going to replay this request, we don't want to
590 * actually get a lock, just perform the intent. */
591 if (req->rq_transno || req->rq_replay) {
592 lockreq = req_capsule_client_get(pill, &RMF_DLM_REQ);
593 lockreq->lock_flags |= ldlm_flags_to_wire(LDLM_FL_INTENT_ONLY);
596 if (rc == ELDLM_LOCK_ABORTED) {
598 memset(lockh, 0, sizeof(*lockh));
600 } else { /* rc = 0 */
601 lock = ldlm_handle2lock(lockh);
602 LASSERT(lock != NULL);
604 /* If the server gave us back a different lock mode, we should
605 * fix up our variables. */
606 if (lock->l_req_mode != einfo->ei_mode) {
607 ldlm_lock_addref(lockh, lock->l_req_mode);
608 ldlm_lock_decref(lockh, einfo->ei_mode);
609 einfo->ei_mode = lock->l_req_mode;
614 lockrep = req_capsule_server_get(pill, &RMF_DLM_REP);
615 LASSERT(lockrep != NULL); /* checked by ldlm_cli_enqueue() */
617 intent->it_disposition = (int)lockrep->lock_policy_res1;
618 intent->it_status = (int)lockrep->lock_policy_res2;
619 intent->it_lock_mode = einfo->ei_mode;
620 intent->it_lock_handle = lockh->cookie;
621 intent->it_data = req;
623 /* Technically speaking rq_transno must already be zero if
624 * it_status is in error, so the check is a bit redundant */
625 if ((!req->rq_transno || intent->it_status < 0) && req->rq_replay)
626 mdc_clear_replay_flag(req, intent->it_status);
628 /* If we're doing an IT_OPEN which did not result in an actual
629 * successful open, then we need to remove the bit which saves
630 * this request for unconditional replay.
632 * It's important that we do this first! Otherwise we might exit the
633 * function without doing so, and try to replay a failed create
635 if (it->it_op & IT_OPEN && req->rq_replay &&
636 (!it_disposition(it, DISP_OPEN_OPEN) ||intent->it_status != 0))
637 mdc_clear_replay_flag(req, intent->it_status);
639 DEBUG_REQ(D_RPCTRACE, req, "op: %d disposition: %x, status: %d",
640 it->it_op, intent->it_disposition, intent->it_status);
642 /* We know what to expect, so we do any byte flipping required here */
643 if (it->it_op & (IT_OPEN | IT_UNLINK | IT_LOOKUP | IT_GETATTR)) {
644 struct mdt_body *body;
646 body = req_capsule_server_get(pill, &RMF_MDT_BODY);
648 CERROR ("Can't swab mdt_body\n");
652 if (it_disposition(it, DISP_OPEN_OPEN) &&
653 !it_open_error(DISP_OPEN_OPEN, it)) {
655 * If this is a successful OPEN request, we need to set
656 * replay handler and data early, so that if replay
657 * happens immediately after swabbing below, new reply
658 * is swabbed by that handler correctly.
660 mdc_set_open_replay_data(NULL, NULL, it);
663 if ((body->valid & (OBD_MD_FLDIREA | OBD_MD_FLEASIZE)) != 0) {
666 mdc_update_max_ea_from_body(exp, body);
669 * The eadata is opaque; just check that it is there.
670 * Eventually, obd_unpackmd() will check the contents.
672 eadata = req_capsule_server_sized_get(pill, &RMF_MDT_MD,
677 /* save lvb data and length in case this is for layout
680 lvb_len = body->eadatasize;
683 * We save the reply LOV EA in case we have to replay a
684 * create for recovery. If we didn't allocate a large
685 * enough request buffer above we need to reallocate it
686 * here to hold the actual LOV EA.
688 * To not save LOV EA if request is not going to replay
689 * (for example error one).
691 if ((it->it_op & IT_OPEN) && req->rq_replay) {
693 if (req_capsule_get_size(pill, &RMF_EADATA,
696 mdc_realloc_openmsg(req, body);
698 req_capsule_shrink(pill, &RMF_EADATA,
702 req_capsule_set_size(pill, &RMF_EADATA,
706 lmm = req_capsule_client_get(pill, &RMF_EADATA);
708 memcpy(lmm, eadata, body->eadatasize);
712 if (body->valid & OBD_MD_FLRMTPERM) {
713 struct mdt_remote_perm *perm;
715 LASSERT(client_is_remote(exp));
716 perm = req_capsule_server_swab_get(pill, &RMF_ACL,
717 lustre_swab_mdt_remote_perm);
721 if (body->valid & OBD_MD_FLMDSCAPA) {
722 struct lustre_capa *capa, *p;
724 capa = req_capsule_server_get(pill, &RMF_CAPA1);
728 if (it->it_op & IT_OPEN) {
729 /* client fid capa will be checked in replay */
730 p = req_capsule_client_get(pill, &RMF_CAPA2);
735 if (body->valid & OBD_MD_FLOSSCAPA) {
736 struct lustre_capa *capa;
738 capa = req_capsule_server_get(pill, &RMF_CAPA2);
742 } else if (it->it_op & IT_LAYOUT) {
743 /* maybe the lock was granted right away and layout
744 * is packed into RMF_DLM_LVB of req */
745 lvb_len = req_capsule_get_size(pill, &RMF_DLM_LVB, RCL_SERVER);
747 lvb_data = req_capsule_server_sized_get(pill,
748 &RMF_DLM_LVB, lvb_len);
749 if (lvb_data == NULL)
754 /* fill in stripe data for layout lock */
755 lock = ldlm_handle2lock(lockh);
756 if (lock != NULL && ldlm_has_layout(lock) && lvb_data != NULL) {
759 LDLM_DEBUG(lock, "layout lock returned by: %s, lvb_len: %d\n",
760 ldlm_it2str(it->it_op), lvb_len);
762 OBD_ALLOC_LARGE(lmm, lvb_len);
767 memcpy(lmm, lvb_data, lvb_len);
769 /* install lvb_data */
770 lock_res_and_lock(lock);
771 if (lock->l_lvb_data == NULL) {
772 lock->l_lvb_data = lmm;
773 lock->l_lvb_len = lvb_len;
776 unlock_res_and_lock(lock);
778 OBD_FREE_LARGE(lmm, lvb_len);
786 /* We always reserve enough space in the reply packet for a stripe MD, because
787 * we don't know in advance the file type. */
788 int mdc_enqueue(struct obd_export *exp, struct ldlm_enqueue_info *einfo,
789 struct lookup_intent *it, struct md_op_data *op_data,
790 struct lustre_handle *lockh, void *lmm, int lmmsize,
791 struct ptlrpc_request **reqp, __u64 extra_lock_flags)
793 struct obd_device *obddev = class_exp2obd(exp);
794 struct ptlrpc_request *req = NULL;
795 __u64 flags, saved_flags = extra_lock_flags;
797 struct ldlm_res_id res_id;
798 static const ldlm_policy_data_t lookup_policy =
799 { .l_inodebits = { MDS_INODELOCK_LOOKUP } };
800 static const ldlm_policy_data_t update_policy =
801 { .l_inodebits = { MDS_INODELOCK_UPDATE } };
802 static const ldlm_policy_data_t layout_policy =
803 { .l_inodebits = { MDS_INODELOCK_LAYOUT } };
804 static const ldlm_policy_data_t getxattr_policy = {
805 .l_inodebits = { MDS_INODELOCK_XATTR } };
806 ldlm_policy_data_t const *policy = &lookup_policy;
807 int generation, resends = 0;
808 struct ldlm_reply *lockrep;
809 enum lvb_type lvb_type = 0;
812 LASSERTF(!it || einfo->ei_type == LDLM_IBITS, "lock type %d\n",
815 fid_build_reg_res_name(&op_data->op_fid1, &res_id);
818 saved_flags |= LDLM_FL_HAS_INTENT;
819 if (it->it_op & (IT_UNLINK | IT_GETATTR | IT_READDIR))
820 policy = &update_policy;
821 else if (it->it_op & IT_LAYOUT)
822 policy = &layout_policy;
823 else if (it->it_op & (IT_GETXATTR | IT_SETXATTR))
824 policy = &getxattr_policy;
827 LASSERT(reqp == NULL);
829 generation = obddev->u.cli.cl_import->imp_generation;
833 /* The only way right now is FLOCK, in this case we hide flock
834 policy as lmm, but lmmsize is 0 */
835 LASSERT(lmm && lmmsize == 0);
836 LASSERTF(einfo->ei_type == LDLM_FLOCK, "lock type %d\n",
838 policy = (ldlm_policy_data_t *)lmm;
839 res_id.name[3] = LDLM_FLOCK;
840 } else if (it->it_op & IT_OPEN) {
841 req = mdc_intent_open_pack(exp, it, op_data, lmm, lmmsize,
843 policy = &update_policy;
844 einfo->ei_cbdata = NULL;
846 } else if (it->it_op & IT_UNLINK) {
847 req = mdc_intent_unlink_pack(exp, it, op_data);
848 } else if (it->it_op & (IT_GETATTR | IT_LOOKUP)) {
849 req = mdc_intent_getattr_pack(exp, it, op_data);
850 } else if (it->it_op & IT_READDIR) {
851 req = mdc_enqueue_pack(exp, 0);
852 } else if (it->it_op & IT_LAYOUT) {
853 if (!imp_connect_lvb_type(class_exp2cliimp(exp)))
855 req = mdc_intent_layout_pack(exp, it, op_data);
856 lvb_type = LVB_T_LAYOUT;
857 } else if (it->it_op & IT_GETXATTR) {
858 req = mdc_intent_getxattr_pack(exp, it, op_data);
865 RETURN(PTR_ERR(req));
867 if (req != NULL && it && it->it_op & IT_CREAT)
868 /* ask ptlrpc not to resend on EINPROGRESS since we have our own
870 req->rq_no_retry_einprogress = 1;
873 req->rq_generation_set = 1;
874 req->rq_import_generation = generation;
875 req->rq_sent = cfs_time_current_sec() + resends;
878 /* It is important to obtain rpc_lock first (if applicable), so that
879 * threads that are serialised with rpc_lock are not polluting our
880 * rpcs in flight counter. We do not do flock request limiting, though*/
882 mdc_get_rpc_lock(obddev->u.cli.cl_rpc_lock, it);
883 rc = mdc_enter_request(&obddev->u.cli);
885 mdc_put_rpc_lock(obddev->u.cli.cl_rpc_lock, it);
886 mdc_clear_replay_flag(req, 0);
887 ptlrpc_req_finished(req);
892 rc = ldlm_cli_enqueue(exp, &req, einfo, &res_id, policy, &flags, NULL,
893 0, lvb_type, lockh, 0);
895 /* For flock requests we immediatelly return without further
896 delay and let caller deal with the rest, since rest of
897 this function metadata processing makes no sense for flock
898 requests anyway. But in case of problem during comms with
899 Server (ETIMEDOUT) or any signal/kill attempt (EINTR), we
900 can not rely on caller and this mainly for F_UNLCKs
901 (explicits or automatically generated by Kernel to clean
902 current FLocks upon exit) that can't be trashed */
903 if (((rc == -EINTR) || (rc == -ETIMEDOUT)) &&
904 (einfo->ei_type == LDLM_FLOCK) &&
905 (einfo->ei_mode == LCK_NL))
910 mdc_exit_request(&obddev->u.cli);
911 mdc_put_rpc_lock(obddev->u.cli.cl_rpc_lock, it);
914 CERROR("ldlm_cli_enqueue: %d\n", rc);
915 mdc_clear_replay_flag(req, rc);
916 ptlrpc_req_finished(req);
920 lockrep = req_capsule_server_get(&req->rq_pill, &RMF_DLM_REP);
921 LASSERT(lockrep != NULL);
923 lockrep->lock_policy_res2 =
924 ptlrpc_status_ntoh(lockrep->lock_policy_res2);
926 /* Retry the create infinitely when we get -EINPROGRESS from
927 * server. This is required by the new quota design. */
928 if (it && it->it_op & IT_CREAT &&
929 (int)lockrep->lock_policy_res2 == -EINPROGRESS) {
930 mdc_clear_replay_flag(req, rc);
931 ptlrpc_req_finished(req);
934 CDEBUG(D_HA, "%s: resend:%d op:%d "DFID"/"DFID"\n",
935 obddev->obd_name, resends, it->it_op,
936 PFID(&op_data->op_fid1), PFID(&op_data->op_fid2));
938 if (generation == obddev->u.cli.cl_import->imp_generation) {
941 CDEBUG(D_HA, "resend cross eviction\n");
946 rc = mdc_finish_enqueue(exp, req, einfo, it, lockh, rc);
948 if (lustre_handle_is_used(lockh)) {
949 ldlm_lock_decref(lockh, einfo->ei_mode);
950 memset(lockh, 0, sizeof(*lockh));
952 ptlrpc_req_finished(req);
957 static int mdc_finish_intent_lock(struct obd_export *exp,
958 struct ptlrpc_request *request,
959 struct md_op_data *op_data,
960 struct lookup_intent *it,
961 struct lustre_handle *lockh)
963 struct lustre_handle old_lock;
964 struct mdt_body *mdt_body;
965 struct ldlm_lock *lock;
969 LASSERT(request != NULL);
970 LASSERT(request != LP_POISON);
971 LASSERT(request->rq_repmsg != LP_POISON);
973 if (!it_disposition(it, DISP_IT_EXECD)) {
974 /* The server failed before it even started executing the
975 * intent, i.e. because it couldn't unpack the request. */
976 LASSERT(it->d.lustre.it_status != 0);
977 RETURN(it->d.lustre.it_status);
979 rc = it_open_error(DISP_IT_EXECD, it);
983 mdt_body = req_capsule_server_get(&request->rq_pill, &RMF_MDT_BODY);
984 LASSERT(mdt_body != NULL); /* mdc_enqueue checked */
986 /* If we were revalidating a fid/name pair, mark the intent in
987 * case we fail and get called again from lookup */
988 if (fid_is_sane(&op_data->op_fid2) &&
989 it->it_create_mode & M_CHECK_STALE &&
990 it->it_op != IT_GETATTR) {
991 it_set_disposition(it, DISP_ENQ_COMPLETE);
993 /* Also: did we find the same inode? */
994 /* sever can return one of two fids:
995 * op_fid2 - new allocated fid - if file is created.
996 * op_fid3 - existent fid - if file only open.
997 * op_fid3 is saved in lmv_intent_open */
998 if ((!lu_fid_eq(&op_data->op_fid2, &mdt_body->fid1)) &&
999 (!lu_fid_eq(&op_data->op_fid3, &mdt_body->fid1))) {
1000 CDEBUG(D_DENTRY, "Found stale data "DFID"("DFID")/"DFID
1001 "\n", PFID(&op_data->op_fid2),
1002 PFID(&op_data->op_fid2), PFID(&mdt_body->fid1));
1007 rc = it_open_error(DISP_LOOKUP_EXECD, it);
1011 /* keep requests around for the multiple phases of the call
1012 * this shows the DISP_XX must guarantee we make it into the call
1014 if (!it_disposition(it, DISP_ENQ_CREATE_REF) &&
1015 it_disposition(it, DISP_OPEN_CREATE) &&
1016 !it_open_error(DISP_OPEN_CREATE, it)) {
1017 it_set_disposition(it, DISP_ENQ_CREATE_REF);
1018 ptlrpc_request_addref(request); /* balanced in ll_create_node */
1020 if (!it_disposition(it, DISP_ENQ_OPEN_REF) &&
1021 it_disposition(it, DISP_OPEN_OPEN) &&
1022 !it_open_error(DISP_OPEN_OPEN, it)) {
1023 it_set_disposition(it, DISP_ENQ_OPEN_REF);
1024 ptlrpc_request_addref(request); /* balanced in ll_file_open */
1025 /* BUG 11546 - eviction in the middle of open rpc processing */
1026 OBD_FAIL_TIMEOUT(OBD_FAIL_MDC_ENQUEUE_PAUSE, obd_timeout);
1029 if (it->it_op & IT_CREAT) {
1030 /* XXX this belongs in ll_create_it */
1031 } else if (it->it_op == IT_OPEN) {
1032 LASSERT(!it_disposition(it, DISP_OPEN_CREATE));
1034 LASSERT(it->it_op & (IT_GETATTR | IT_LOOKUP | IT_LAYOUT));
1037 /* If we already have a matching lock, then cancel the new
1038 * one. We have to set the data here instead of in
1039 * mdc_enqueue, because we need to use the child's inode as
1040 * the l_ast_data to match, and that's not available until
1041 * intent_finish has performed the iget().) */
1042 lock = ldlm_handle2lock(lockh);
1044 ldlm_policy_data_t policy = lock->l_policy_data;
1045 LDLM_DEBUG(lock, "matching against this");
1047 LASSERTF(fid_res_name_eq(&mdt_body->fid1,
1048 &lock->l_resource->lr_name),
1049 "Lock res_id: "DLDLMRES", fid: "DFID"\n",
1050 PLDLMRES(lock->l_resource), PFID(&mdt_body->fid1));
1051 LDLM_LOCK_PUT(lock);
1053 memcpy(&old_lock, lockh, sizeof(*lockh));
1054 if (ldlm_lock_match(NULL, LDLM_FL_BLOCK_GRANTED, NULL,
1055 LDLM_IBITS, &policy, LCK_NL, &old_lock, 0)) {
1056 ldlm_lock_decref_and_cancel(lockh,
1057 it->d.lustre.it_lock_mode);
1058 memcpy(lockh, &old_lock, sizeof(old_lock));
1059 it->d.lustre.it_lock_handle = lockh->cookie;
1062 CDEBUG(D_DENTRY,"D_IT dentry %.*s intent: %s status %d disp %x rc %d\n",
1063 op_data->op_namelen, op_data->op_name, ldlm_it2str(it->it_op),
1064 it->d.lustre.it_status, it->d.lustre.it_disposition, rc);
1068 int mdc_revalidate_lock(struct obd_export *exp, struct lookup_intent *it,
1069 struct lu_fid *fid, __u64 *bits)
1071 /* We could just return 1 immediately, but since we should only
1072 * be called in revalidate_it if we already have a lock, let's
1074 struct ldlm_res_id res_id;
1075 struct lustre_handle lockh;
1076 ldlm_policy_data_t policy;
1080 if (it->d.lustre.it_lock_handle) {
1081 lockh.cookie = it->d.lustre.it_lock_handle;
1082 mode = ldlm_revalidate_lock_handle(&lockh, bits);
1084 fid_build_reg_res_name(fid, &res_id);
1085 switch (it->it_op) {
1087 /* File attributes are held under multiple bits:
1088 * nlink is under lookup lock, size and times are
1089 * under UPDATE lock and recently we've also got
1090 * a separate permissions lock for owner/group/acl that
1091 * were protected by lookup lock before.
1092 * Getattr must provide all of that information,
1093 * so we need to ensure we have all of those locks.
1094 * Unfortunately, if the bits are split across multiple
1095 * locks, there's no easy way to match all of them here,
1096 * so an extra RPC would be performed to fetch all
1097 * of those bits at once for now. */
1098 policy.l_inodebits.bits = MDS_INODELOCK_UPDATE |
1099 MDS_INODELOCK_LOOKUP |
1103 policy.l_inodebits.bits = MDS_INODELOCK_LAYOUT;
1106 policy.l_inodebits.bits = MDS_INODELOCK_LOOKUP;
1110 mode = ldlm_lock_match(exp->exp_obd->obd_namespace,
1111 LDLM_FL_BLOCK_GRANTED, &res_id,
1112 LDLM_IBITS, &policy,
1113 LCK_CR|LCK_CW|LCK_PR|LCK_PW, &lockh, 0);
1117 it->d.lustre.it_lock_handle = lockh.cookie;
1118 it->d.lustre.it_lock_mode = mode;
1120 it->d.lustre.it_lock_handle = 0;
1121 it->d.lustre.it_lock_mode = 0;
1128 * This long block is all about fixing up the lock and request state
1129 * so that it is correct as of the moment _before_ the operation was
1130 * applied; that way, the VFS will think that everything is normal and
1131 * call Lustre's regular VFS methods.
1133 * If we're performing a creation, that means that unless the creation
1134 * failed with EEXIST, we should fake up a negative dentry.
1136 * For everything else, we want to lookup to succeed.
1138 * One additional note: if CREATE or OPEN succeeded, we add an extra
1139 * reference to the request because we need to keep it around until
1140 * ll_create/ll_open gets called.
1142 * The server will return to us, in it_disposition, an indication of
1143 * exactly what d.lustre.it_status refers to.
1145 * If DISP_OPEN_OPEN is set, then d.lustre.it_status refers to the open() call,
1146 * otherwise if DISP_OPEN_CREATE is set, then it status is the
1147 * creation failure mode. In either case, one of DISP_LOOKUP_NEG or
1148 * DISP_LOOKUP_POS will be set, indicating whether the child lookup
1151 * Else, if DISP_LOOKUP_EXECD then d.lustre.it_status is the rc of the
1154 int mdc_intent_lock(struct obd_export *exp, struct md_op_data *op_data,
1155 void *lmm, int lmmsize, struct lookup_intent *it,
1156 int lookup_flags, struct ptlrpc_request **reqp,
1157 ldlm_blocking_callback cb_blocking,
1158 __u64 extra_lock_flags)
1160 struct lustre_handle lockh;
1165 CDEBUG(D_DLMTRACE, "(name: %.*s,"DFID") in obj "DFID
1166 ", intent: %s flags %#Lo\n", op_data->op_namelen,
1167 op_data->op_name, PFID(&op_data->op_fid2),
1168 PFID(&op_data->op_fid1), ldlm_it2str(it->it_op),
1172 if (fid_is_sane(&op_data->op_fid2) &&
1173 (it->it_op & (IT_LOOKUP | IT_GETATTR))) {
1174 /* We could just return 1 immediately, but since we should only
1175 * be called in revalidate_it if we already have a lock, let's
1177 it->d.lustre.it_lock_handle = 0;
1178 rc = mdc_revalidate_lock(exp, it, &op_data->op_fid2, NULL);
1179 /* Only return failure if it was not GETATTR by cfid
1180 (from inode_revalidate) */
1181 if (rc || op_data->op_namelen != 0)
1185 /* lookup_it may be called only after revalidate_it has run, because
1186 * revalidate_it cannot return errors, only zero. Returning zero causes
1187 * this call to lookup, which *can* return an error.
1189 * We only want to execute the request associated with the intent one
1190 * time, however, so don't send the request again. Instead, skip past
1191 * this and use the request from revalidate. In this case, revalidate
1192 * never dropped its reference, so the refcounts are all OK */
1193 if (!it_disposition(it, DISP_ENQ_COMPLETE)) {
1194 struct ldlm_enqueue_info einfo = {
1195 .ei_type = LDLM_IBITS,
1196 .ei_mode = it_to_lock_mode(it),
1197 .ei_cb_bl = cb_blocking,
1198 .ei_cb_cp = ldlm_completion_ast,
1201 /* For case if upper layer did not alloc fid, do it now. */
1202 if (!fid_is_sane(&op_data->op_fid2) && it->it_op & IT_CREAT) {
1203 rc = mdc_fid_alloc(exp, &op_data->op_fid2, op_data);
1205 CERROR("Can't alloc new fid, rc %d\n", rc);
1209 rc = mdc_enqueue(exp, &einfo, it, op_data, &lockh,
1210 lmm, lmmsize, NULL, extra_lock_flags);
1213 } else if (!fid_is_sane(&op_data->op_fid2) ||
1214 !(it->it_create_mode & M_CHECK_STALE)) {
1215 /* DISP_ENQ_COMPLETE set means there is extra reference on
1216 * request referenced from this intent, saved for subsequent
1217 * lookup. This path is executed when we proceed to this
1218 * lookup, so we clear DISP_ENQ_COMPLETE */
1219 it_clear_disposition(it, DISP_ENQ_COMPLETE);
1221 *reqp = it->d.lustre.it_data;
1222 rc = mdc_finish_intent_lock(exp, *reqp, op_data, it, &lockh);
1226 static int mdc_intent_getattr_async_interpret(const struct lu_env *env,
1227 struct ptlrpc_request *req,
1230 struct mdc_getattr_args *ga = args;
1231 struct obd_export *exp = ga->ga_exp;
1232 struct md_enqueue_info *minfo = ga->ga_minfo;
1233 struct ldlm_enqueue_info *einfo = ga->ga_einfo;
1234 struct lookup_intent *it;
1235 struct lustre_handle *lockh;
1236 struct obd_device *obddev;
1237 struct ldlm_reply *lockrep;
1238 __u64 flags = LDLM_FL_HAS_INTENT;
1242 lockh = &minfo->mi_lockh;
1244 obddev = class_exp2obd(exp);
1246 mdc_exit_request(&obddev->u.cli);
1247 if (OBD_FAIL_CHECK(OBD_FAIL_MDC_GETATTR_ENQUEUE))
1250 rc = ldlm_cli_enqueue_fini(exp, req, einfo->ei_type, 1, einfo->ei_mode,
1251 &flags, NULL, 0, lockh, rc);
1253 CERROR("ldlm_cli_enqueue_fini: %d\n", rc);
1254 mdc_clear_replay_flag(req, rc);
1258 lockrep = req_capsule_server_get(&req->rq_pill, &RMF_DLM_REP);
1259 LASSERT(lockrep != NULL);
1261 lockrep->lock_policy_res2 =
1262 ptlrpc_status_ntoh(lockrep->lock_policy_res2);
1264 rc = mdc_finish_enqueue(exp, req, einfo, it, lockh, rc);
1268 rc = mdc_finish_intent_lock(exp, req, &minfo->mi_data, it, lockh);
1272 OBD_FREE_PTR(einfo);
1273 minfo->mi_cb(req, minfo, rc);
1277 int mdc_intent_getattr_async(struct obd_export *exp,
1278 struct md_enqueue_info *minfo,
1279 struct ldlm_enqueue_info *einfo)
1281 struct md_op_data *op_data = &minfo->mi_data;
1282 struct lookup_intent *it = &minfo->mi_it;
1283 struct ptlrpc_request *req;
1284 struct mdc_getattr_args *ga;
1285 struct obd_device *obddev = class_exp2obd(exp);
1286 struct ldlm_res_id res_id;
1287 /*XXX: Both MDS_INODELOCK_LOOKUP and MDS_INODELOCK_UPDATE are needed
1288 * for statahead currently. Consider CMD in future, such two bits
1289 * maybe managed by different MDS, should be adjusted then. */
1290 ldlm_policy_data_t policy = {
1291 .l_inodebits = { MDS_INODELOCK_LOOKUP |
1292 MDS_INODELOCK_UPDATE }
1295 __u64 flags = LDLM_FL_HAS_INTENT;
1298 CDEBUG(D_DLMTRACE,"name: %.*s in inode "DFID", intent: %s flags %#Lo\n",
1299 op_data->op_namelen, op_data->op_name, PFID(&op_data->op_fid1),
1300 ldlm_it2str(it->it_op), it->it_flags);
1302 fid_build_reg_res_name(&op_data->op_fid1, &res_id);
1303 req = mdc_intent_getattr_pack(exp, it, op_data);
1305 RETURN(PTR_ERR(req));
1307 rc = mdc_enter_request(&obddev->u.cli);
1309 ptlrpc_req_finished(req);
1313 rc = ldlm_cli_enqueue(exp, &req, einfo, &res_id, &policy, &flags, NULL,
1314 0, LVB_T_NONE, &minfo->mi_lockh, 1);
1316 mdc_exit_request(&obddev->u.cli);
1317 ptlrpc_req_finished(req);
1321 CLASSERT(sizeof(*ga) <= sizeof(req->rq_async_args));
1322 ga = ptlrpc_req_async_args(req);
1324 ga->ga_minfo = minfo;
1325 ga->ga_einfo = einfo;
1327 req->rq_interpret_reply = mdc_intent_getattr_async_interpret;
1328 ptlrpcd_add_req(req, PDL_POLICY_LOCAL, -1);