4 * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
6 * This program is free software; you can redistribute it and/or modify
7 * it under the terms of the GNU General Public License version 2 only,
8 * as published by the Free Software Foundation.
10 * This program is distributed in the hope that it will be useful, but
11 * WITHOUT ANY WARRANTY; without even the implied warranty of
12 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
13 * General Public License version 2 for more details (a copy is included
14 * in the LICENSE file that accompanied this code).
16 * You should have received a copy of the GNU General Public License
17 * version 2 along with this program; If not, see
18 * http://www.sun.com/software/products/lustre/docs/GPLv2.pdf
20 * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara,
21 * CA 95054 USA or visit www.sun.com if you need additional information or
27 * Copyright (c) 2003, 2010, Oracle and/or its affiliates. All rights reserved.
28 * Use is subject to license terms.
30 * Copyright (c) 2011, 2013, Intel Corporation.
33 * This file is part of Lustre, http://www.lustre.org/
34 * Lustre is a trademark of Sun Microsystems, Inc.
37 #define DEBUG_SUBSYSTEM S_MDC
40 # include <linux/module.h>
41 # include <linux/lustre_intent.h>
43 # include <liblustre.h>
47 #include <obd_class.h>
48 #include <lustre_dlm.h>
49 #include <lustre_fid.h> /* fid_res_name_eq() */
50 #include <lustre_mdc.h>
51 #include <lustre_net.h>
52 #include <lustre_req_layout.h>
53 #include "mdc_internal.h"
55 struct mdc_getattr_args {
56 struct obd_export *ga_exp;
57 struct md_enqueue_info *ga_minfo;
58 struct ldlm_enqueue_info *ga_einfo;
61 int it_disposition(struct lookup_intent *it, int flag)
63 return it->d.lustre.it_disposition & flag;
65 EXPORT_SYMBOL(it_disposition);
67 void it_set_disposition(struct lookup_intent *it, int flag)
69 it->d.lustre.it_disposition |= flag;
71 EXPORT_SYMBOL(it_set_disposition);
73 void it_clear_disposition(struct lookup_intent *it, int flag)
75 it->d.lustre.it_disposition &= ~flag;
77 EXPORT_SYMBOL(it_clear_disposition);
79 int it_open_error(int phase, struct lookup_intent *it)
81 if (it_disposition(it, DISP_OPEN_LEASE)) {
82 if (phase >= DISP_OPEN_LEASE)
83 return it->d.lustre.it_status;
87 if (it_disposition(it, DISP_OPEN_OPEN)) {
88 if (phase >= DISP_OPEN_OPEN)
89 return it->d.lustre.it_status;
94 if (it_disposition(it, DISP_OPEN_CREATE)) {
95 if (phase >= DISP_OPEN_CREATE)
96 return it->d.lustre.it_status;
101 if (it_disposition(it, DISP_LOOKUP_EXECD)) {
102 if (phase >= DISP_LOOKUP_EXECD)
103 return it->d.lustre.it_status;
108 if (it_disposition(it, DISP_IT_EXECD)) {
109 if (phase >= DISP_IT_EXECD)
110 return it->d.lustre.it_status;
114 CERROR("it disp: %X, status: %d\n", it->d.lustre.it_disposition,
115 it->d.lustre.it_status);
119 EXPORT_SYMBOL(it_open_error);
121 /* this must be called on a lockh that is known to have a referenced lock */
122 int mdc_set_lock_data(struct obd_export *exp, __u64 *lockh, void *data,
125 struct ldlm_lock *lock;
126 struct inode *new_inode = data;
135 lock = ldlm_handle2lock((struct lustre_handle *)lockh);
137 LASSERT(lock != NULL);
138 lock_res_and_lock(lock);
140 if (lock->l_resource->lr_lvb_inode &&
141 lock->l_resource->lr_lvb_inode != data) {
142 struct inode *old_inode = lock->l_resource->lr_lvb_inode;
143 LASSERTF(old_inode->i_state & I_FREEING,
144 "Found existing inode %p/%lu/%u state %lu in lock: "
145 "setting data to %p/%lu/%u\n", old_inode,
146 old_inode->i_ino, old_inode->i_generation,
148 new_inode, new_inode->i_ino, new_inode->i_generation);
151 lock->l_resource->lr_lvb_inode = new_inode;
153 *bits = lock->l_policy_data.l_inodebits.bits;
155 unlock_res_and_lock(lock);
161 ldlm_mode_t mdc_lock_match(struct obd_export *exp, __u64 flags,
162 const struct lu_fid *fid, ldlm_type_t type,
163 ldlm_policy_data_t *policy, ldlm_mode_t mode,
164 struct lustre_handle *lockh)
166 struct ldlm_res_id res_id;
170 fid_build_reg_res_name(fid, &res_id);
171 rc = ldlm_lock_match(class_exp2obd(exp)->obd_namespace, flags,
172 &res_id, type, policy, mode, lockh, 0);
176 int mdc_cancel_unused(struct obd_export *exp,
177 const struct lu_fid *fid,
178 ldlm_policy_data_t *policy,
180 ldlm_cancel_flags_t flags,
183 struct ldlm_res_id res_id;
184 struct obd_device *obd = class_exp2obd(exp);
189 fid_build_reg_res_name(fid, &res_id);
190 rc = ldlm_cli_cancel_unused_resource(obd->obd_namespace, &res_id,
191 policy, mode, flags, opaque);
195 int mdc_null_inode(struct obd_export *exp,
196 const struct lu_fid *fid)
198 struct ldlm_res_id res_id;
199 struct ldlm_resource *res;
200 struct ldlm_namespace *ns = class_exp2obd(exp)->obd_namespace;
203 LASSERTF(ns != NULL, "no namespace passed\n");
205 fid_build_reg_res_name(fid, &res_id);
207 res = ldlm_resource_get(ns, NULL, &res_id, 0, 0);
212 res->lr_lvb_inode = NULL;
215 ldlm_resource_putref(res);
219 /* find any ldlm lock of the inode in mdc
223 int mdc_find_cbdata(struct obd_export *exp,
224 const struct lu_fid *fid,
225 ldlm_iterator_t it, void *data)
227 struct ldlm_res_id res_id;
231 fid_build_reg_res_name((struct lu_fid*)fid, &res_id);
232 rc = ldlm_resource_iterate(class_exp2obd(exp)->obd_namespace, &res_id,
234 if (rc == LDLM_ITER_STOP)
236 else if (rc == LDLM_ITER_CONTINUE)
241 static inline void mdc_clear_replay_flag(struct ptlrpc_request *req, int rc)
243 /* Don't hold error requests for replay. */
244 if (req->rq_replay) {
245 spin_lock(&req->rq_lock);
247 spin_unlock(&req->rq_lock);
249 if (rc && req->rq_transno != 0) {
250 DEBUG_REQ(D_ERROR, req, "transno returned on error rc %d", rc);
255 /* Save a large LOV EA into the request buffer so that it is available
256 * for replay. We don't do this in the initial request because the
257 * original request doesn't need this buffer (at most it sends just the
258 * lov_mds_md) and it is a waste of RAM/bandwidth to send the empty
259 * buffer and may also be difficult to allocate and save a very large
260 * request buffer for each open. (bug 5707)
262 * OOM here may cause recovery failure if lmm is needed (only for the
263 * original open if the MDS crashed just when this client also OOM'd)
264 * but this is incredibly unlikely, and questionable whether the client
265 * could do MDS recovery under OOM anyways... */
266 static void mdc_realloc_openmsg(struct ptlrpc_request *req,
267 struct mdt_body *body)
271 /* FIXME: remove this explicit offset. */
272 rc = sptlrpc_cli_enlarge_reqbuf(req, DLM_INTENT_REC_OFF + 4,
275 CERROR("Can't enlarge segment %d size to %d\n",
276 DLM_INTENT_REC_OFF + 4, body->eadatasize);
277 body->valid &= ~OBD_MD_FLEASIZE;
278 body->eadatasize = 0;
282 static struct ptlrpc_request *mdc_intent_open_pack(struct obd_export *exp,
283 struct lookup_intent *it,
284 struct md_op_data *op_data,
285 void *lmm, int lmmsize,
288 struct ptlrpc_request *req;
289 struct obd_device *obddev = class_exp2obd(exp);
290 struct ldlm_intent *lit;
291 CFS_LIST_HEAD(cancels);
297 it->it_create_mode = (it->it_create_mode & ~S_IFMT) | S_IFREG;
299 /* XXX: openlock is not cancelled for cross-refs. */
300 /* If inode is known, cancel conflicting OPEN locks. */
301 if (fid_is_sane(&op_data->op_fid2)) {
302 if (it->it_flags & MDS_OPEN_LEASE) { /* try to get lease */
303 if (it->it_flags & FMODE_WRITE)
308 if (it->it_flags & (FMODE_WRITE|MDS_OPEN_TRUNC))
311 else if (it->it_flags & FMODE_EXEC)
317 count = mdc_resource_get_unused(exp, &op_data->op_fid2,
322 /* If CREATE, cancel parent's UPDATE lock. */
323 if (it->it_op & IT_CREAT)
327 count += mdc_resource_get_unused(exp, &op_data->op_fid1,
329 MDS_INODELOCK_UPDATE);
331 req = ptlrpc_request_alloc(class_exp2cliimp(exp),
332 &RQF_LDLM_INTENT_OPEN);
334 ldlm_lock_list_put(&cancels, l_bl_ast, count);
335 RETURN(ERR_PTR(-ENOMEM));
338 /* parent capability */
339 mdc_set_capa_size(req, &RMF_CAPA1, op_data->op_capa1);
340 /* child capability, reserve the size according to parent capa, it will
341 * be filled after we get the reply */
342 mdc_set_capa_size(req, &RMF_CAPA2, op_data->op_capa1);
344 req_capsule_set_size(&req->rq_pill, &RMF_NAME, RCL_CLIENT,
345 op_data->op_namelen + 1);
346 req_capsule_set_size(&req->rq_pill, &RMF_EADATA, RCL_CLIENT,
347 max(lmmsize, obddev->u.cli.cl_default_mds_easize));
349 rc = ldlm_prep_enqueue_req(exp, req, &cancels, count);
351 ptlrpc_request_free(req);
355 spin_lock(&req->rq_lock);
356 req->rq_replay = req->rq_import->imp_replayable;
357 spin_unlock(&req->rq_lock);
359 /* pack the intent */
360 lit = req_capsule_client_get(&req->rq_pill, &RMF_LDLM_INTENT);
361 lit->opc = (__u64)it->it_op;
363 /* pack the intended request */
364 mdc_open_pack(req, op_data, it->it_create_mode, 0, it->it_flags, lmm,
367 /* for remote client, fetch remote perm for current user */
368 if (client_is_remote(exp))
369 req_capsule_set_size(&req->rq_pill, &RMF_ACL, RCL_SERVER,
370 sizeof(struct mdt_remote_perm));
371 ptlrpc_request_set_replen(req);
375 static struct ptlrpc_request *
376 mdc_intent_getxattr_pack(struct obd_export *exp,
377 struct lookup_intent *it,
378 struct md_op_data *op_data)
380 struct ptlrpc_request *req;
381 struct ldlm_intent *lit;
382 int rc, count = 0, maxdata;
383 CFS_LIST_HEAD(cancels);
387 req = ptlrpc_request_alloc(class_exp2cliimp(exp),
388 &RQF_LDLM_INTENT_GETXATTR);
390 RETURN(ERR_PTR(-ENOMEM));
392 mdc_set_capa_size(req, &RMF_CAPA1, op_data->op_capa1);
394 rc = ldlm_prep_enqueue_req(exp, req, &cancels, count);
396 ptlrpc_request_free(req);
400 /* pack the intent */
401 lit = req_capsule_client_get(&req->rq_pill, &RMF_LDLM_INTENT);
402 lit->opc = IT_GETXATTR;
404 maxdata = class_exp2cliimp(exp)->imp_connect_data.ocd_max_easize;
406 /* pack the intended request */
407 mdc_pack_body(req, &op_data->op_fid1, op_data->op_capa1,
408 op_data->op_valid, maxdata, -1, 0);
410 req_capsule_set_size(&req->rq_pill, &RMF_EADATA,
411 RCL_SERVER, maxdata);
413 req_capsule_set_size(&req->rq_pill, &RMF_EAVALS,
414 RCL_SERVER, maxdata);
416 req_capsule_set_size(&req->rq_pill, &RMF_EAVALS_LENS,
417 RCL_SERVER, maxdata);
419 ptlrpc_request_set_replen(req);
424 static struct ptlrpc_request *mdc_intent_unlink_pack(struct obd_export *exp,
425 struct lookup_intent *it,
426 struct md_op_data *op_data)
428 struct ptlrpc_request *req;
429 struct obd_device *obddev = class_exp2obd(exp);
430 struct ldlm_intent *lit;
434 req = ptlrpc_request_alloc(class_exp2cliimp(exp),
435 &RQF_LDLM_INTENT_UNLINK);
437 RETURN(ERR_PTR(-ENOMEM));
439 mdc_set_capa_size(req, &RMF_CAPA1, op_data->op_capa1);
440 req_capsule_set_size(&req->rq_pill, &RMF_NAME, RCL_CLIENT,
441 op_data->op_namelen + 1);
443 rc = ldlm_prep_enqueue_req(exp, req, NULL, 0);
445 ptlrpc_request_free(req);
449 /* pack the intent */
450 lit = req_capsule_client_get(&req->rq_pill, &RMF_LDLM_INTENT);
451 lit->opc = (__u64)it->it_op;
453 /* pack the intended request */
454 mdc_unlink_pack(req, op_data);
456 req_capsule_set_size(&req->rq_pill, &RMF_MDT_MD, RCL_SERVER,
457 obddev->u.cli.cl_max_mds_easize);
458 req_capsule_set_size(&req->rq_pill, &RMF_ACL, RCL_SERVER,
459 obddev->u.cli.cl_max_mds_cookiesize);
460 ptlrpc_request_set_replen(req);
464 static struct ptlrpc_request *mdc_intent_getattr_pack(struct obd_export *exp,
465 struct lookup_intent *it,
466 struct md_op_data *op_data)
468 struct ptlrpc_request *req;
469 struct obd_device *obddev = class_exp2obd(exp);
470 obd_valid valid = OBD_MD_FLGETATTR | OBD_MD_FLEASIZE |
471 OBD_MD_FLMODEASIZE | OBD_MD_FLDIREA |
472 OBD_MD_FLMDSCAPA | OBD_MD_MEA |
473 (client_is_remote(exp) ?
474 OBD_MD_FLRMTPERM : OBD_MD_FLACL);
475 struct ldlm_intent *lit;
479 req = ptlrpc_request_alloc(class_exp2cliimp(exp),
480 &RQF_LDLM_INTENT_GETATTR);
482 RETURN(ERR_PTR(-ENOMEM));
484 mdc_set_capa_size(req, &RMF_CAPA1, op_data->op_capa1);
485 req_capsule_set_size(&req->rq_pill, &RMF_NAME, RCL_CLIENT,
486 op_data->op_namelen + 1);
488 rc = ldlm_prep_enqueue_req(exp, req, NULL, 0);
490 ptlrpc_request_free(req);
494 /* pack the intent */
495 lit = req_capsule_client_get(&req->rq_pill, &RMF_LDLM_INTENT);
496 lit->opc = (__u64)it->it_op;
498 /* pack the intended request */
499 mdc_getattr_pack(req, valid, it->it_flags, op_data,
500 obddev->u.cli.cl_max_mds_easize);
502 req_capsule_set_size(&req->rq_pill, &RMF_MDT_MD, RCL_SERVER,
503 obddev->u.cli.cl_max_mds_easize);
504 if (client_is_remote(exp))
505 req_capsule_set_size(&req->rq_pill, &RMF_ACL, RCL_SERVER,
506 sizeof(struct mdt_remote_perm));
507 ptlrpc_request_set_replen(req);
511 static struct ptlrpc_request *mdc_intent_layout_pack(struct obd_export *exp,
512 struct lookup_intent *it,
513 struct md_op_data *unused)
515 struct obd_device *obd = class_exp2obd(exp);
516 struct ptlrpc_request *req;
517 struct ldlm_intent *lit;
518 struct layout_intent *layout;
522 req = ptlrpc_request_alloc(class_exp2cliimp(exp),
523 &RQF_LDLM_INTENT_LAYOUT);
525 RETURN(ERR_PTR(-ENOMEM));
527 req_capsule_set_size(&req->rq_pill, &RMF_EADATA, RCL_CLIENT, 0);
528 rc = ldlm_prep_enqueue_req(exp, req, NULL, 0);
530 ptlrpc_request_free(req);
534 /* pack the intent */
535 lit = req_capsule_client_get(&req->rq_pill, &RMF_LDLM_INTENT);
536 lit->opc = (__u64)it->it_op;
538 /* pack the layout intent request */
539 layout = req_capsule_client_get(&req->rq_pill, &RMF_LAYOUT_INTENT);
540 /* LAYOUT_INTENT_ACCESS is generic, specific operation will be
541 * set for replication */
542 layout->li_opc = LAYOUT_INTENT_ACCESS;
544 req_capsule_set_size(&req->rq_pill, &RMF_DLM_LVB, RCL_SERVER,
545 obd->u.cli.cl_max_mds_easize);
546 ptlrpc_request_set_replen(req);
550 static struct ptlrpc_request *
551 mdc_enqueue_pack(struct obd_export *exp, int lvb_len)
553 struct ptlrpc_request *req;
557 req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_LDLM_ENQUEUE);
559 RETURN(ERR_PTR(-ENOMEM));
561 rc = ldlm_prep_enqueue_req(exp, req, NULL, 0);
563 ptlrpc_request_free(req);
567 req_capsule_set_size(&req->rq_pill, &RMF_DLM_LVB, RCL_SERVER, lvb_len);
568 ptlrpc_request_set_replen(req);
572 static int mdc_finish_enqueue(struct obd_export *exp,
573 struct ptlrpc_request *req,
574 struct ldlm_enqueue_info *einfo,
575 struct lookup_intent *it,
576 struct lustre_handle *lockh,
579 struct req_capsule *pill = &req->rq_pill;
580 struct ldlm_request *lockreq;
581 struct ldlm_reply *lockrep;
582 struct lustre_intent_data *intent = &it->d.lustre;
583 struct ldlm_lock *lock;
584 void *lvb_data = NULL;
589 /* Similarly, if we're going to replay this request, we don't want to
590 * actually get a lock, just perform the intent. */
591 if (req->rq_transno || req->rq_replay) {
592 lockreq = req_capsule_client_get(pill, &RMF_DLM_REQ);
593 lockreq->lock_flags |= ldlm_flags_to_wire(LDLM_FL_INTENT_ONLY);
596 if (rc == ELDLM_LOCK_ABORTED) {
598 memset(lockh, 0, sizeof(*lockh));
600 } else { /* rc = 0 */
601 lock = ldlm_handle2lock(lockh);
602 LASSERT(lock != NULL);
604 /* If the server gave us back a different lock mode, we should
605 * fix up our variables. */
606 if (lock->l_req_mode != einfo->ei_mode) {
607 ldlm_lock_addref(lockh, lock->l_req_mode);
608 ldlm_lock_decref(lockh, einfo->ei_mode);
609 einfo->ei_mode = lock->l_req_mode;
614 lockrep = req_capsule_server_get(pill, &RMF_DLM_REP);
615 LASSERT(lockrep != NULL); /* checked by ldlm_cli_enqueue() */
617 intent->it_disposition = (int)lockrep->lock_policy_res1;
618 intent->it_status = (int)lockrep->lock_policy_res2;
619 intent->it_lock_mode = einfo->ei_mode;
620 intent->it_lock_handle = lockh->cookie;
621 intent->it_data = req;
623 /* Technically speaking rq_transno must already be zero if
624 * it_status is in error, so the check is a bit redundant */
625 if ((!req->rq_transno || intent->it_status < 0) && req->rq_replay)
626 mdc_clear_replay_flag(req, intent->it_status);
628 /* If we're doing an IT_OPEN which did not result in an actual
629 * successful open, then we need to remove the bit which saves
630 * this request for unconditional replay.
632 * It's important that we do this first! Otherwise we might exit the
633 * function without doing so, and try to replay a failed create
635 if (it->it_op & IT_OPEN && req->rq_replay &&
636 (!it_disposition(it, DISP_OPEN_OPEN) ||intent->it_status != 0))
637 mdc_clear_replay_flag(req, intent->it_status);
639 DEBUG_REQ(D_RPCTRACE, req, "op: %d disposition: %x, status: %d",
640 it->it_op, intent->it_disposition, intent->it_status);
642 /* We know what to expect, so we do any byte flipping required here */
643 if (it->it_op & (IT_OPEN | IT_UNLINK | IT_LOOKUP | IT_GETATTR)) {
644 struct mdt_body *body;
646 body = req_capsule_server_get(pill, &RMF_MDT_BODY);
648 CERROR ("Can't swab mdt_body\n");
652 if (it_disposition(it, DISP_OPEN_OPEN) &&
653 !it_open_error(DISP_OPEN_OPEN, it)) {
655 * If this is a successful OPEN request, we need to set
656 * replay handler and data early, so that if replay
657 * happens immediately after swabbing below, new reply
658 * is swabbed by that handler correctly.
660 mdc_set_open_replay_data(NULL, NULL, it);
663 if ((body->valid & (OBD_MD_FLDIREA | OBD_MD_FLEASIZE)) != 0) {
666 mdc_update_max_ea_from_body(exp, body);
669 * The eadata is opaque; just check that it is there.
670 * Eventually, obd_unpackmd() will check the contents.
672 eadata = req_capsule_server_sized_get(pill, &RMF_MDT_MD,
677 /* save lvb data and length in case this is for layout
680 lvb_len = body->eadatasize;
683 * We save the reply LOV EA in case we have to replay a
684 * create for recovery. If we didn't allocate a large
685 * enough request buffer above we need to reallocate it
686 * here to hold the actual LOV EA.
688 * To not save LOV EA if request is not going to replay
689 * (for example error one).
691 if ((it->it_op & IT_OPEN) && req->rq_replay) {
693 if (req_capsule_get_size(pill, &RMF_EADATA,
696 mdc_realloc_openmsg(req, body);
698 req_capsule_shrink(pill, &RMF_EADATA,
702 req_capsule_set_size(pill, &RMF_EADATA,
706 lmm = req_capsule_client_get(pill, &RMF_EADATA);
708 memcpy(lmm, eadata, body->eadatasize);
712 if (body->valid & OBD_MD_FLRMTPERM) {
713 struct mdt_remote_perm *perm;
715 LASSERT(client_is_remote(exp));
716 perm = req_capsule_server_swab_get(pill, &RMF_ACL,
717 lustre_swab_mdt_remote_perm);
721 if (body->valid & OBD_MD_FLMDSCAPA) {
722 struct lustre_capa *capa, *p;
724 capa = req_capsule_server_get(pill, &RMF_CAPA1);
728 if (it->it_op & IT_OPEN) {
729 /* client fid capa will be checked in replay */
730 p = req_capsule_client_get(pill, &RMF_CAPA2);
735 if (body->valid & OBD_MD_FLOSSCAPA) {
736 struct lustre_capa *capa;
738 capa = req_capsule_server_get(pill, &RMF_CAPA2);
742 } else if (it->it_op & IT_LAYOUT) {
743 /* maybe the lock was granted right away and layout
744 * is packed into RMF_DLM_LVB of req */
745 lvb_len = req_capsule_get_size(pill, &RMF_DLM_LVB, RCL_SERVER);
747 lvb_data = req_capsule_server_sized_get(pill,
748 &RMF_DLM_LVB, lvb_len);
749 if (lvb_data == NULL)
754 /* fill in stripe data for layout lock */
755 lock = ldlm_handle2lock(lockh);
756 if (lock != NULL && ldlm_has_layout(lock) && lvb_data != NULL) {
759 LDLM_DEBUG(lock, "layout lock returned by: %s, lvb_len: %d\n",
760 ldlm_it2str(it->it_op), lvb_len);
762 OBD_ALLOC_LARGE(lmm, lvb_len);
767 memcpy(lmm, lvb_data, lvb_len);
769 /* install lvb_data */
770 lock_res_and_lock(lock);
771 if (lock->l_lvb_data == NULL) {
772 lock->l_lvb_type = LVB_T_LAYOUT;
773 lock->l_lvb_data = lmm;
774 lock->l_lvb_len = lvb_len;
777 unlock_res_and_lock(lock);
779 OBD_FREE_LARGE(lmm, lvb_len);
787 /* We always reserve enough space in the reply packet for a stripe MD, because
788 * we don't know in advance the file type. */
789 int mdc_enqueue(struct obd_export *exp, struct ldlm_enqueue_info *einfo,
790 struct lookup_intent *it, struct md_op_data *op_data,
791 struct lustre_handle *lockh, void *lmm, int lmmsize,
792 struct ptlrpc_request **reqp, __u64 extra_lock_flags)
794 struct obd_device *obddev = class_exp2obd(exp);
795 struct ptlrpc_request *req = NULL;
796 __u64 flags, saved_flags = extra_lock_flags;
798 struct ldlm_res_id res_id;
799 static const ldlm_policy_data_t lookup_policy =
800 { .l_inodebits = { MDS_INODELOCK_LOOKUP } };
801 static const ldlm_policy_data_t update_policy =
802 { .l_inodebits = { MDS_INODELOCK_UPDATE } };
803 static const ldlm_policy_data_t layout_policy =
804 { .l_inodebits = { MDS_INODELOCK_LAYOUT } };
805 static const ldlm_policy_data_t getxattr_policy = {
806 .l_inodebits = { MDS_INODELOCK_XATTR } };
807 ldlm_policy_data_t const *policy = &lookup_policy;
808 int generation, resends = 0;
809 struct ldlm_reply *lockrep;
810 enum lvb_type lvb_type = 0;
813 LASSERTF(!it || einfo->ei_type == LDLM_IBITS, "lock type %d\n",
816 fid_build_reg_res_name(&op_data->op_fid1, &res_id);
819 saved_flags |= LDLM_FL_HAS_INTENT;
820 if (it->it_op & (IT_UNLINK | IT_GETATTR | IT_READDIR))
821 policy = &update_policy;
822 else if (it->it_op & IT_LAYOUT)
823 policy = &layout_policy;
824 else if (it->it_op & (IT_GETXATTR | IT_SETXATTR))
825 policy = &getxattr_policy;
828 LASSERT(reqp == NULL);
830 generation = obddev->u.cli.cl_import->imp_generation;
834 /* The only way right now is FLOCK, in this case we hide flock
835 policy as lmm, but lmmsize is 0 */
836 LASSERT(lmm && lmmsize == 0);
837 LASSERTF(einfo->ei_type == LDLM_FLOCK, "lock type %d\n",
839 policy = (ldlm_policy_data_t *)lmm;
840 res_id.name[3] = LDLM_FLOCK;
841 } else if (it->it_op & IT_OPEN) {
842 req = mdc_intent_open_pack(exp, it, op_data, lmm, lmmsize,
844 policy = &update_policy;
845 einfo->ei_cbdata = NULL;
847 } else if (it->it_op & IT_UNLINK) {
848 req = mdc_intent_unlink_pack(exp, it, op_data);
849 } else if (it->it_op & (IT_GETATTR | IT_LOOKUP)) {
850 req = mdc_intent_getattr_pack(exp, it, op_data);
851 } else if (it->it_op & IT_READDIR) {
852 req = mdc_enqueue_pack(exp, 0);
853 } else if (it->it_op & IT_LAYOUT) {
854 if (!imp_connect_lvb_type(class_exp2cliimp(exp)))
856 req = mdc_intent_layout_pack(exp, it, op_data);
857 lvb_type = LVB_T_LAYOUT;
858 } else if (it->it_op & IT_GETXATTR) {
859 req = mdc_intent_getxattr_pack(exp, it, op_data);
866 RETURN(PTR_ERR(req));
868 if (req != NULL && it && it->it_op & IT_CREAT)
869 /* ask ptlrpc not to resend on EINPROGRESS since we have our own
871 req->rq_no_retry_einprogress = 1;
874 req->rq_generation_set = 1;
875 req->rq_import_generation = generation;
876 req->rq_sent = cfs_time_current_sec() + resends;
879 /* It is important to obtain rpc_lock first (if applicable), so that
880 * threads that are serialised with rpc_lock are not polluting our
881 * rpcs in flight counter. We do not do flock request limiting, though*/
883 mdc_get_rpc_lock(obddev->u.cli.cl_rpc_lock, it);
884 rc = mdc_enter_request(&obddev->u.cli);
886 mdc_put_rpc_lock(obddev->u.cli.cl_rpc_lock, it);
887 mdc_clear_replay_flag(req, 0);
888 ptlrpc_req_finished(req);
893 rc = ldlm_cli_enqueue(exp, &req, einfo, &res_id, policy, &flags, NULL,
894 0, lvb_type, lockh, 0);
896 /* For flock requests we immediatelly return without further
897 delay and let caller deal with the rest, since rest of
898 this function metadata processing makes no sense for flock
899 requests anyway. But in case of problem during comms with
900 Server (ETIMEDOUT) or any signal/kill attempt (EINTR), we
901 can not rely on caller and this mainly for F_UNLCKs
902 (explicits or automatically generated by Kernel to clean
903 current FLocks upon exit) that can't be trashed */
904 if (((rc == -EINTR) || (rc == -ETIMEDOUT)) &&
905 (einfo->ei_type == LDLM_FLOCK) &&
906 (einfo->ei_mode == LCK_NL))
911 mdc_exit_request(&obddev->u.cli);
912 mdc_put_rpc_lock(obddev->u.cli.cl_rpc_lock, it);
915 CERROR("ldlm_cli_enqueue: %d\n", rc);
916 mdc_clear_replay_flag(req, rc);
917 ptlrpc_req_finished(req);
921 lockrep = req_capsule_server_get(&req->rq_pill, &RMF_DLM_REP);
922 LASSERT(lockrep != NULL);
924 lockrep->lock_policy_res2 =
925 ptlrpc_status_ntoh(lockrep->lock_policy_res2);
927 /* Retry the create infinitely when we get -EINPROGRESS from
928 * server. This is required by the new quota design. */
929 if (it && it->it_op & IT_CREAT &&
930 (int)lockrep->lock_policy_res2 == -EINPROGRESS) {
931 mdc_clear_replay_flag(req, rc);
932 ptlrpc_req_finished(req);
935 CDEBUG(D_HA, "%s: resend:%d op:%d "DFID"/"DFID"\n",
936 obddev->obd_name, resends, it->it_op,
937 PFID(&op_data->op_fid1), PFID(&op_data->op_fid2));
939 if (generation == obddev->u.cli.cl_import->imp_generation) {
942 CDEBUG(D_HA, "resend cross eviction\n");
947 rc = mdc_finish_enqueue(exp, req, einfo, it, lockh, rc);
949 if (lustre_handle_is_used(lockh)) {
950 ldlm_lock_decref(lockh, einfo->ei_mode);
951 memset(lockh, 0, sizeof(*lockh));
953 ptlrpc_req_finished(req);
958 static int mdc_finish_intent_lock(struct obd_export *exp,
959 struct ptlrpc_request *request,
960 struct md_op_data *op_data,
961 struct lookup_intent *it,
962 struct lustre_handle *lockh)
964 struct lustre_handle old_lock;
965 struct mdt_body *mdt_body;
966 struct ldlm_lock *lock;
970 LASSERT(request != NULL);
971 LASSERT(request != LP_POISON);
972 LASSERT(request->rq_repmsg != LP_POISON);
974 if (!it_disposition(it, DISP_IT_EXECD)) {
975 /* The server failed before it even started executing the
976 * intent, i.e. because it couldn't unpack the request. */
977 LASSERT(it->d.lustre.it_status != 0);
978 RETURN(it->d.lustre.it_status);
980 rc = it_open_error(DISP_IT_EXECD, it);
984 mdt_body = req_capsule_server_get(&request->rq_pill, &RMF_MDT_BODY);
985 LASSERT(mdt_body != NULL); /* mdc_enqueue checked */
987 /* If we were revalidating a fid/name pair, mark the intent in
988 * case we fail and get called again from lookup */
989 if (fid_is_sane(&op_data->op_fid2) &&
990 it->it_create_mode & M_CHECK_STALE &&
991 it->it_op != IT_GETATTR) {
992 /* Also: did we find the same inode? */
993 /* sever can return one of two fids:
994 * op_fid2 - new allocated fid - if file is created.
995 * op_fid3 - existent fid - if file only open.
996 * op_fid3 is saved in lmv_intent_open */
997 if ((!lu_fid_eq(&op_data->op_fid2, &mdt_body->fid1)) &&
998 (!lu_fid_eq(&op_data->op_fid3, &mdt_body->fid1))) {
999 CDEBUG(D_DENTRY, "Found stale data "DFID"("DFID")/"DFID
1000 "\n", PFID(&op_data->op_fid2),
1001 PFID(&op_data->op_fid2), PFID(&mdt_body->fid1));
1006 rc = it_open_error(DISP_LOOKUP_EXECD, it);
1010 /* keep requests around for the multiple phases of the call
1011 * this shows the DISP_XX must guarantee we make it into the call
1013 if (!it_disposition(it, DISP_ENQ_CREATE_REF) &&
1014 it_disposition(it, DISP_OPEN_CREATE) &&
1015 !it_open_error(DISP_OPEN_CREATE, it)) {
1016 it_set_disposition(it, DISP_ENQ_CREATE_REF);
1017 ptlrpc_request_addref(request); /* balanced in ll_create_node */
1019 if (!it_disposition(it, DISP_ENQ_OPEN_REF) &&
1020 it_disposition(it, DISP_OPEN_OPEN) &&
1021 !it_open_error(DISP_OPEN_OPEN, it)) {
1022 it_set_disposition(it, DISP_ENQ_OPEN_REF);
1023 ptlrpc_request_addref(request); /* balanced in ll_file_open */
1024 /* BUG 11546 - eviction in the middle of open rpc processing */
1025 OBD_FAIL_TIMEOUT(OBD_FAIL_MDC_ENQUEUE_PAUSE, obd_timeout);
1028 if (it->it_op & IT_CREAT) {
1029 /* XXX this belongs in ll_create_it */
1030 } else if (it->it_op == IT_OPEN) {
1031 LASSERT(!it_disposition(it, DISP_OPEN_CREATE));
1033 LASSERT(it->it_op & (IT_GETATTR | IT_LOOKUP | IT_LAYOUT));
1036 /* If we already have a matching lock, then cancel the new
1037 * one. We have to set the data here instead of in
1038 * mdc_enqueue, because we need to use the child's inode as
1039 * the l_ast_data to match, and that's not available until
1040 * intent_finish has performed the iget().) */
1041 lock = ldlm_handle2lock(lockh);
1043 ldlm_policy_data_t policy = lock->l_policy_data;
1044 LDLM_DEBUG(lock, "matching against this");
1046 LASSERTF(fid_res_name_eq(&mdt_body->fid1,
1047 &lock->l_resource->lr_name),
1048 "Lock res_id: "DLDLMRES", fid: "DFID"\n",
1049 PLDLMRES(lock->l_resource), PFID(&mdt_body->fid1));
1050 LDLM_LOCK_PUT(lock);
1052 memcpy(&old_lock, lockh, sizeof(*lockh));
1053 if (ldlm_lock_match(NULL, LDLM_FL_BLOCK_GRANTED, NULL,
1054 LDLM_IBITS, &policy, LCK_NL, &old_lock, 0)) {
1055 ldlm_lock_decref_and_cancel(lockh,
1056 it->d.lustre.it_lock_mode);
1057 memcpy(lockh, &old_lock, sizeof(old_lock));
1058 it->d.lustre.it_lock_handle = lockh->cookie;
1061 CDEBUG(D_DENTRY,"D_IT dentry %.*s intent: %s status %d disp %x rc %d\n",
1062 op_data->op_namelen, op_data->op_name, ldlm_it2str(it->it_op),
1063 it->d.lustre.it_status, it->d.lustre.it_disposition, rc);
1067 int mdc_revalidate_lock(struct obd_export *exp, struct lookup_intent *it,
1068 struct lu_fid *fid, __u64 *bits)
1070 /* We could just return 1 immediately, but since we should only
1071 * be called in revalidate_it if we already have a lock, let's
1073 struct ldlm_res_id res_id;
1074 struct lustre_handle lockh;
1075 ldlm_policy_data_t policy;
1079 if (it->d.lustre.it_lock_handle) {
1080 lockh.cookie = it->d.lustre.it_lock_handle;
1081 mode = ldlm_revalidate_lock_handle(&lockh, bits);
1083 fid_build_reg_res_name(fid, &res_id);
1084 switch (it->it_op) {
1086 /* File attributes are held under multiple bits:
1087 * nlink is under lookup lock, size and times are
1088 * under UPDATE lock and recently we've also got
1089 * a separate permissions lock for owner/group/acl that
1090 * were protected by lookup lock before.
1091 * Getattr must provide all of that information,
1092 * so we need to ensure we have all of those locks.
1093 * Unfortunately, if the bits are split across multiple
1094 * locks, there's no easy way to match all of them here,
1095 * so an extra RPC would be performed to fetch all
1096 * of those bits at once for now. */
1097 /* For new MDTs(> 2.4), UPDATE|PERM should be enough,
1098 * but for old MDTs (< 2.4), permission is covered
1099 * by LOOKUP lock, so it needs to match all bits here.*/
1100 policy.l_inodebits.bits = MDS_INODELOCK_UPDATE |
1101 MDS_INODELOCK_LOOKUP |
1105 policy.l_inodebits.bits = MDS_INODELOCK_LAYOUT;
1108 policy.l_inodebits.bits = MDS_INODELOCK_LOOKUP;
1112 mode = ldlm_lock_match(exp->exp_obd->obd_namespace,
1113 LDLM_FL_BLOCK_GRANTED, &res_id,
1114 LDLM_IBITS, &policy,
1115 LCK_CR|LCK_CW|LCK_PR|LCK_PW, &lockh, 0);
1119 it->d.lustre.it_lock_handle = lockh.cookie;
1120 it->d.lustre.it_lock_mode = mode;
1122 it->d.lustre.it_lock_handle = 0;
1123 it->d.lustre.it_lock_mode = 0;
1130 * This long block is all about fixing up the lock and request state
1131 * so that it is correct as of the moment _before_ the operation was
1132 * applied; that way, the VFS will think that everything is normal and
1133 * call Lustre's regular VFS methods.
1135 * If we're performing a creation, that means that unless the creation
1136 * failed with EEXIST, we should fake up a negative dentry.
1138 * For everything else, we want to lookup to succeed.
1140 * One additional note: if CREATE or OPEN succeeded, we add an extra
1141 * reference to the request because we need to keep it around until
1142 * ll_create/ll_open gets called.
1144 * The server will return to us, in it_disposition, an indication of
1145 * exactly what d.lustre.it_status refers to.
1147 * If DISP_OPEN_OPEN is set, then d.lustre.it_status refers to the open() call,
1148 * otherwise if DISP_OPEN_CREATE is set, then it status is the
1149 * creation failure mode. In either case, one of DISP_LOOKUP_NEG or
1150 * DISP_LOOKUP_POS will be set, indicating whether the child lookup
1153 * Else, if DISP_LOOKUP_EXECD then d.lustre.it_status is the rc of the
1156 int mdc_intent_lock(struct obd_export *exp, struct md_op_data *op_data,
1157 void *lmm, int lmmsize, struct lookup_intent *it,
1158 int lookup_flags, struct ptlrpc_request **reqp,
1159 ldlm_blocking_callback cb_blocking,
1160 __u64 extra_lock_flags)
1162 struct ldlm_enqueue_info einfo = {
1163 .ei_type = LDLM_IBITS,
1164 .ei_mode = it_to_lock_mode(it),
1165 .ei_cb_bl = cb_blocking,
1166 .ei_cb_cp = ldlm_completion_ast,
1168 struct lustre_handle lockh;
1173 CDEBUG(D_DLMTRACE, "(name: %.*s,"DFID") in obj "DFID
1174 ", intent: %s flags %#Lo\n", op_data->op_namelen,
1175 op_data->op_name, PFID(&op_data->op_fid2),
1176 PFID(&op_data->op_fid1), ldlm_it2str(it->it_op),
1180 if (fid_is_sane(&op_data->op_fid2) &&
1181 (it->it_op & (IT_LOOKUP | IT_GETATTR))) {
1182 /* We could just return 1 immediately, but since we should only
1183 * be called in revalidate_it if we already have a lock, let's
1185 it->d.lustre.it_lock_handle = 0;
1186 rc = mdc_revalidate_lock(exp, it, &op_data->op_fid2, NULL);
1187 /* Only return failure if it was not GETATTR by cfid
1188 (from inode_revalidate) */
1189 if (rc || op_data->op_namelen != 0)
1193 /* For case if upper layer did not alloc fid, do it now. */
1194 if (!fid_is_sane(&op_data->op_fid2) && it->it_op & IT_CREAT) {
1195 rc = mdc_fid_alloc(exp, &op_data->op_fid2, op_data);
1197 CERROR("Can't alloc new fid, rc %d\n", rc);
1201 rc = mdc_enqueue(exp, &einfo, it, op_data, &lockh, lmm, lmmsize, NULL,
1206 *reqp = it->d.lustre.it_data;
1207 rc = mdc_finish_intent_lock(exp, *reqp, op_data, it, &lockh);
1211 static int mdc_intent_getattr_async_interpret(const struct lu_env *env,
1212 struct ptlrpc_request *req,
1215 struct mdc_getattr_args *ga = args;
1216 struct obd_export *exp = ga->ga_exp;
1217 struct md_enqueue_info *minfo = ga->ga_minfo;
1218 struct ldlm_enqueue_info *einfo = ga->ga_einfo;
1219 struct lookup_intent *it;
1220 struct lustre_handle *lockh;
1221 struct obd_device *obddev;
1222 struct ldlm_reply *lockrep;
1223 __u64 flags = LDLM_FL_HAS_INTENT;
1227 lockh = &minfo->mi_lockh;
1229 obddev = class_exp2obd(exp);
1231 mdc_exit_request(&obddev->u.cli);
1232 if (OBD_FAIL_CHECK(OBD_FAIL_MDC_GETATTR_ENQUEUE))
1235 rc = ldlm_cli_enqueue_fini(exp, req, einfo->ei_type, 1, einfo->ei_mode,
1236 &flags, NULL, 0, lockh, rc);
1238 CERROR("ldlm_cli_enqueue_fini: %d\n", rc);
1239 mdc_clear_replay_flag(req, rc);
1243 lockrep = req_capsule_server_get(&req->rq_pill, &RMF_DLM_REP);
1244 LASSERT(lockrep != NULL);
1246 lockrep->lock_policy_res2 =
1247 ptlrpc_status_ntoh(lockrep->lock_policy_res2);
1249 rc = mdc_finish_enqueue(exp, req, einfo, it, lockh, rc);
1253 rc = mdc_finish_intent_lock(exp, req, &minfo->mi_data, it, lockh);
1257 OBD_FREE_PTR(einfo);
1258 minfo->mi_cb(req, minfo, rc);
1262 int mdc_intent_getattr_async(struct obd_export *exp,
1263 struct md_enqueue_info *minfo,
1264 struct ldlm_enqueue_info *einfo)
1266 struct md_op_data *op_data = &minfo->mi_data;
1267 struct lookup_intent *it = &minfo->mi_it;
1268 struct ptlrpc_request *req;
1269 struct mdc_getattr_args *ga;
1270 struct obd_device *obddev = class_exp2obd(exp);
1271 struct ldlm_res_id res_id;
1272 /*XXX: Both MDS_INODELOCK_LOOKUP and MDS_INODELOCK_UPDATE are needed
1273 * for statahead currently. Consider CMD in future, such two bits
1274 * maybe managed by different MDS, should be adjusted then. */
1275 ldlm_policy_data_t policy = {
1276 .l_inodebits = { MDS_INODELOCK_LOOKUP |
1277 MDS_INODELOCK_UPDATE }
1280 __u64 flags = LDLM_FL_HAS_INTENT;
1283 CDEBUG(D_DLMTRACE,"name: %.*s in inode "DFID", intent: %s flags %#Lo\n",
1284 op_data->op_namelen, op_data->op_name, PFID(&op_data->op_fid1),
1285 ldlm_it2str(it->it_op), it->it_flags);
1287 fid_build_reg_res_name(&op_data->op_fid1, &res_id);
1288 req = mdc_intent_getattr_pack(exp, it, op_data);
1290 RETURN(PTR_ERR(req));
1292 rc = mdc_enter_request(&obddev->u.cli);
1294 ptlrpc_req_finished(req);
1298 rc = ldlm_cli_enqueue(exp, &req, einfo, &res_id, &policy, &flags, NULL,
1299 0, LVB_T_NONE, &minfo->mi_lockh, 1);
1301 mdc_exit_request(&obddev->u.cli);
1302 ptlrpc_req_finished(req);
1306 CLASSERT(sizeof(*ga) <= sizeof(req->rq_async_args));
1307 ga = ptlrpc_req_async_args(req);
1309 ga->ga_minfo = minfo;
1310 ga->ga_einfo = einfo;
1312 req->rq_interpret_reply = mdc_intent_getattr_async_interpret;
1313 ptlrpcd_add_req(req, PDL_POLICY_LOCAL, -1);