4 * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
6 * This program is free software; you can redistribute it and/or modify
7 * it under the terms of the GNU General Public License version 2 only,
8 * as published by the Free Software Foundation.
10 * This program is distributed in the hope that it will be useful, but
11 * WITHOUT ANY WARRANTY; without even the implied warranty of
12 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
13 * General Public License version 2 for more details (a copy is included
14 * in the LICENSE file that accompanied this code).
16 * You should have received a copy of the GNU General Public License
17 * version 2 along with this program; If not, see
18 * http://www.sun.com/software/products/lustre/docs/GPLv2.pdf
20 * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara,
21 * CA 95054 USA or visit www.sun.com if you need additional information or
27 * Copyright (c) 2003, 2010, Oracle and/or its affiliates. All rights reserved.
28 * Use is subject to license terms.
30 * Copyright (c) 2011, 2013, Intel Corporation.
33 * This file is part of Lustre, http://www.lustre.org/
34 * Lustre is a trademark of Sun Microsystems, Inc.
37 #define DEBUG_SUBSYSTEM S_MDC
40 # include <linux/module.h>
41 # include <linux/lustre_intent.h>
43 # include <liblustre.h>
47 #include <obd_class.h>
48 #include <lustre_dlm.h>
49 #include <lustre_fid.h> /* fid_res_name_eq() */
50 #include <lustre_mdc.h>
51 #include <lustre_net.h>
52 #include <lustre_req_layout.h>
53 #include "mdc_internal.h"
55 struct mdc_getattr_args {
56 struct obd_export *ga_exp;
57 struct md_enqueue_info *ga_minfo;
58 struct ldlm_enqueue_info *ga_einfo;
61 int it_disposition(struct lookup_intent *it, int flag)
63 return it->d.lustre.it_disposition & flag;
65 EXPORT_SYMBOL(it_disposition);
67 void it_set_disposition(struct lookup_intent *it, int flag)
69 it->d.lustre.it_disposition |= flag;
71 EXPORT_SYMBOL(it_set_disposition);
73 void it_clear_disposition(struct lookup_intent *it, int flag)
75 it->d.lustre.it_disposition &= ~flag;
77 EXPORT_SYMBOL(it_clear_disposition);
79 int it_open_error(int phase, struct lookup_intent *it)
81 if (it_disposition(it, DISP_OPEN_LEASE)) {
82 if (phase >= DISP_OPEN_LEASE)
83 return it->d.lustre.it_status;
87 if (it_disposition(it, DISP_OPEN_OPEN)) {
88 if (phase >= DISP_OPEN_OPEN)
89 return it->d.lustre.it_status;
94 if (it_disposition(it, DISP_OPEN_CREATE)) {
95 if (phase >= DISP_OPEN_CREATE)
96 return it->d.lustre.it_status;
101 if (it_disposition(it, DISP_LOOKUP_EXECD)) {
102 if (phase >= DISP_LOOKUP_EXECD)
103 return it->d.lustre.it_status;
108 if (it_disposition(it, DISP_IT_EXECD)) {
109 if (phase >= DISP_IT_EXECD)
110 return it->d.lustre.it_status;
114 CERROR("it disp: %X, status: %d\n", it->d.lustre.it_disposition,
115 it->d.lustre.it_status);
119 EXPORT_SYMBOL(it_open_error);
121 /* this must be called on a lockh that is known to have a referenced lock */
122 int mdc_set_lock_data(struct obd_export *exp, __u64 *lockh, void *data,
125 struct ldlm_lock *lock;
126 struct inode *new_inode = data;
135 lock = ldlm_handle2lock((struct lustre_handle *)lockh);
137 LASSERT(lock != NULL);
138 lock_res_and_lock(lock);
140 if (lock->l_resource->lr_lvb_inode &&
141 lock->l_resource->lr_lvb_inode != data) {
142 struct inode *old_inode = lock->l_resource->lr_lvb_inode;
143 LASSERTF(old_inode->i_state & I_FREEING,
144 "Found existing inode %p/%lu/%u state %lu in lock: "
145 "setting data to %p/%lu/%u\n", old_inode,
146 old_inode->i_ino, old_inode->i_generation,
148 new_inode, new_inode->i_ino, new_inode->i_generation);
151 lock->l_resource->lr_lvb_inode = new_inode;
153 *bits = lock->l_policy_data.l_inodebits.bits;
155 unlock_res_and_lock(lock);
161 ldlm_mode_t mdc_lock_match(struct obd_export *exp, __u64 flags,
162 const struct lu_fid *fid, ldlm_type_t type,
163 ldlm_policy_data_t *policy, ldlm_mode_t mode,
164 struct lustre_handle *lockh)
166 struct ldlm_res_id res_id;
170 fid_build_reg_res_name(fid, &res_id);
171 /* LU-4405: Clear bits not supported by server */
172 policy->l_inodebits.bits &= exp_connect_ibits(exp);
173 rc = ldlm_lock_match(class_exp2obd(exp)->obd_namespace, flags,
174 &res_id, type, policy, mode, lockh, 0);
178 int mdc_cancel_unused(struct obd_export *exp,
179 const struct lu_fid *fid,
180 ldlm_policy_data_t *policy,
182 ldlm_cancel_flags_t flags,
185 struct ldlm_res_id res_id;
186 struct obd_device *obd = class_exp2obd(exp);
191 fid_build_reg_res_name(fid, &res_id);
192 rc = ldlm_cli_cancel_unused_resource(obd->obd_namespace, &res_id,
193 policy, mode, flags, opaque);
197 int mdc_null_inode(struct obd_export *exp,
198 const struct lu_fid *fid)
200 struct ldlm_res_id res_id;
201 struct ldlm_resource *res;
202 struct ldlm_namespace *ns = class_exp2obd(exp)->obd_namespace;
205 LASSERTF(ns != NULL, "no namespace passed\n");
207 fid_build_reg_res_name(fid, &res_id);
209 res = ldlm_resource_get(ns, NULL, &res_id, 0, 0);
214 res->lr_lvb_inode = NULL;
217 ldlm_resource_putref(res);
221 /* find any ldlm lock of the inode in mdc
225 int mdc_find_cbdata(struct obd_export *exp,
226 const struct lu_fid *fid,
227 ldlm_iterator_t it, void *data)
229 struct ldlm_res_id res_id;
233 fid_build_reg_res_name((struct lu_fid*)fid, &res_id);
234 rc = ldlm_resource_iterate(class_exp2obd(exp)->obd_namespace, &res_id,
236 if (rc == LDLM_ITER_STOP)
238 else if (rc == LDLM_ITER_CONTINUE)
243 static inline void mdc_clear_replay_flag(struct ptlrpc_request *req, int rc)
245 /* Don't hold error requests for replay. */
246 if (req->rq_replay) {
247 spin_lock(&req->rq_lock);
249 spin_unlock(&req->rq_lock);
251 if (rc && req->rq_transno != 0) {
252 DEBUG_REQ(D_ERROR, req, "transno returned on error rc %d", rc);
257 /* Save a large LOV EA into the request buffer so that it is available
258 * for replay. We don't do this in the initial request because the
259 * original request doesn't need this buffer (at most it sends just the
260 * lov_mds_md) and it is a waste of RAM/bandwidth to send the empty
261 * buffer and may also be difficult to allocate and save a very large
262 * request buffer for each open. (bug 5707)
264 * OOM here may cause recovery failure if lmm is needed (only for the
265 * original open if the MDS crashed just when this client also OOM'd)
266 * but this is incredibly unlikely, and questionable whether the client
267 * could do MDS recovery under OOM anyways... */
268 static void mdc_realloc_openmsg(struct ptlrpc_request *req,
269 struct mdt_body *body)
273 /* FIXME: remove this explicit offset. */
274 rc = sptlrpc_cli_enlarge_reqbuf(req, DLM_INTENT_REC_OFF + 4,
277 CERROR("Can't enlarge segment %d size to %d\n",
278 DLM_INTENT_REC_OFF + 4, body->eadatasize);
279 body->valid &= ~OBD_MD_FLEASIZE;
280 body->eadatasize = 0;
284 static struct ptlrpc_request *mdc_intent_open_pack(struct obd_export *exp,
285 struct lookup_intent *it,
286 struct md_op_data *op_data,
287 void *lmm, int lmmsize,
290 struct ptlrpc_request *req;
291 struct obd_device *obddev = class_exp2obd(exp);
292 struct ldlm_intent *lit;
293 CFS_LIST_HEAD(cancels);
299 it->it_create_mode = (it->it_create_mode & ~S_IFMT) | S_IFREG;
301 /* XXX: openlock is not cancelled for cross-refs. */
302 /* If inode is known, cancel conflicting OPEN locks. */
303 if (fid_is_sane(&op_data->op_fid2)) {
304 if (it->it_flags & MDS_OPEN_LEASE) { /* try to get lease */
305 if (it->it_flags & FMODE_WRITE)
310 if (it->it_flags & (FMODE_WRITE|MDS_OPEN_TRUNC))
313 else if (it->it_flags & FMODE_EXEC)
319 count = mdc_resource_get_unused(exp, &op_data->op_fid2,
324 /* If CREATE, cancel parent's UPDATE lock. */
325 if (it->it_op & IT_CREAT)
329 count += mdc_resource_get_unused(exp, &op_data->op_fid1,
331 MDS_INODELOCK_UPDATE);
333 req = ptlrpc_request_alloc(class_exp2cliimp(exp),
334 &RQF_LDLM_INTENT_OPEN);
336 ldlm_lock_list_put(&cancels, l_bl_ast, count);
337 RETURN(ERR_PTR(-ENOMEM));
340 /* parent capability */
341 mdc_set_capa_size(req, &RMF_CAPA1, op_data->op_capa1);
342 /* child capability, reserve the size according to parent capa, it will
343 * be filled after we get the reply */
344 mdc_set_capa_size(req, &RMF_CAPA2, op_data->op_capa1);
346 req_capsule_set_size(&req->rq_pill, &RMF_NAME, RCL_CLIENT,
347 op_data->op_namelen + 1);
348 req_capsule_set_size(&req->rq_pill, &RMF_EADATA, RCL_CLIENT,
349 max(lmmsize, obddev->u.cli.cl_default_mds_easize));
351 rc = ldlm_prep_enqueue_req(exp, req, &cancels, count);
353 ptlrpc_request_free(req);
357 spin_lock(&req->rq_lock);
358 req->rq_replay = req->rq_import->imp_replayable;
359 spin_unlock(&req->rq_lock);
361 /* pack the intent */
362 lit = req_capsule_client_get(&req->rq_pill, &RMF_LDLM_INTENT);
363 lit->opc = (__u64)it->it_op;
365 /* pack the intended request */
366 mdc_open_pack(req, op_data, it->it_create_mode, 0, it->it_flags, lmm,
369 req_capsule_set_size(&req->rq_pill, &RMF_MDT_MD, RCL_SERVER,
370 obddev->u.cli.cl_max_mds_easize);
372 /* for remote client, fetch remote perm for current user */
373 if (client_is_remote(exp))
374 req_capsule_set_size(&req->rq_pill, &RMF_ACL, RCL_SERVER,
375 sizeof(struct mdt_remote_perm));
376 ptlrpc_request_set_replen(req);
380 static struct ptlrpc_request *
381 mdc_intent_getxattr_pack(struct obd_export *exp,
382 struct lookup_intent *it,
383 struct md_op_data *op_data)
385 struct ptlrpc_request *req;
386 struct ldlm_intent *lit;
387 int rc, count = 0, maxdata;
388 CFS_LIST_HEAD(cancels);
392 req = ptlrpc_request_alloc(class_exp2cliimp(exp),
393 &RQF_LDLM_INTENT_GETXATTR);
395 RETURN(ERR_PTR(-ENOMEM));
397 mdc_set_capa_size(req, &RMF_CAPA1, op_data->op_capa1);
399 rc = ldlm_prep_enqueue_req(exp, req, &cancels, count);
401 ptlrpc_request_free(req);
405 /* pack the intent */
406 lit = req_capsule_client_get(&req->rq_pill, &RMF_LDLM_INTENT);
407 lit->opc = IT_GETXATTR;
409 maxdata = class_exp2cliimp(exp)->imp_connect_data.ocd_max_easize;
411 /* pack the intended request */
412 mdc_pack_body(req, &op_data->op_fid1, op_data->op_capa1,
413 op_data->op_valid, maxdata, -1, 0);
415 req_capsule_set_size(&req->rq_pill, &RMF_EADATA,
416 RCL_SERVER, maxdata);
418 req_capsule_set_size(&req->rq_pill, &RMF_EAVALS,
419 RCL_SERVER, maxdata);
421 req_capsule_set_size(&req->rq_pill, &RMF_EAVALS_LENS,
422 RCL_SERVER, maxdata);
424 ptlrpc_request_set_replen(req);
429 static struct ptlrpc_request *mdc_intent_unlink_pack(struct obd_export *exp,
430 struct lookup_intent *it,
431 struct md_op_data *op_data)
433 struct ptlrpc_request *req;
434 struct obd_device *obddev = class_exp2obd(exp);
435 struct ldlm_intent *lit;
439 req = ptlrpc_request_alloc(class_exp2cliimp(exp),
440 &RQF_LDLM_INTENT_UNLINK);
442 RETURN(ERR_PTR(-ENOMEM));
444 mdc_set_capa_size(req, &RMF_CAPA1, op_data->op_capa1);
445 req_capsule_set_size(&req->rq_pill, &RMF_NAME, RCL_CLIENT,
446 op_data->op_namelen + 1);
448 rc = ldlm_prep_enqueue_req(exp, req, NULL, 0);
450 ptlrpc_request_free(req);
454 /* pack the intent */
455 lit = req_capsule_client_get(&req->rq_pill, &RMF_LDLM_INTENT);
456 lit->opc = (__u64)it->it_op;
458 /* pack the intended request */
459 mdc_unlink_pack(req, op_data);
461 req_capsule_set_size(&req->rq_pill, &RMF_MDT_MD, RCL_SERVER,
462 obddev->u.cli.cl_max_mds_easize);
463 req_capsule_set_size(&req->rq_pill, &RMF_ACL, RCL_SERVER,
464 obddev->u.cli.cl_max_mds_cookiesize);
465 ptlrpc_request_set_replen(req);
469 static struct ptlrpc_request *mdc_intent_getattr_pack(struct obd_export *exp,
470 struct lookup_intent *it,
471 struct md_op_data *op_data)
473 struct ptlrpc_request *req;
474 struct obd_device *obddev = class_exp2obd(exp);
475 obd_valid valid = OBD_MD_FLGETATTR | OBD_MD_FLEASIZE |
476 OBD_MD_FLMODEASIZE | OBD_MD_FLDIREA |
477 OBD_MD_FLMDSCAPA | OBD_MD_MEA |
478 (client_is_remote(exp) ?
479 OBD_MD_FLRMTPERM : OBD_MD_FLACL);
480 struct ldlm_intent *lit;
484 req = ptlrpc_request_alloc(class_exp2cliimp(exp),
485 &RQF_LDLM_INTENT_GETATTR);
487 RETURN(ERR_PTR(-ENOMEM));
489 mdc_set_capa_size(req, &RMF_CAPA1, op_data->op_capa1);
490 req_capsule_set_size(&req->rq_pill, &RMF_NAME, RCL_CLIENT,
491 op_data->op_namelen + 1);
493 rc = ldlm_prep_enqueue_req(exp, req, NULL, 0);
495 ptlrpc_request_free(req);
499 /* pack the intent */
500 lit = req_capsule_client_get(&req->rq_pill, &RMF_LDLM_INTENT);
501 lit->opc = (__u64)it->it_op;
503 /* pack the intended request */
504 mdc_getattr_pack(req, valid, it->it_flags, op_data,
505 obddev->u.cli.cl_max_mds_easize);
507 req_capsule_set_size(&req->rq_pill, &RMF_MDT_MD, RCL_SERVER,
508 obddev->u.cli.cl_max_mds_easize);
509 if (client_is_remote(exp))
510 req_capsule_set_size(&req->rq_pill, &RMF_ACL, RCL_SERVER,
511 sizeof(struct mdt_remote_perm));
512 ptlrpc_request_set_replen(req);
516 static struct ptlrpc_request *mdc_intent_layout_pack(struct obd_export *exp,
517 struct lookup_intent *it,
518 struct md_op_data *unused)
520 struct obd_device *obd = class_exp2obd(exp);
521 struct ptlrpc_request *req;
522 struct ldlm_intent *lit;
523 struct layout_intent *layout;
527 req = ptlrpc_request_alloc(class_exp2cliimp(exp),
528 &RQF_LDLM_INTENT_LAYOUT);
530 RETURN(ERR_PTR(-ENOMEM));
532 req_capsule_set_size(&req->rq_pill, &RMF_EADATA, RCL_CLIENT, 0);
533 rc = ldlm_prep_enqueue_req(exp, req, NULL, 0);
535 ptlrpc_request_free(req);
539 /* pack the intent */
540 lit = req_capsule_client_get(&req->rq_pill, &RMF_LDLM_INTENT);
541 lit->opc = (__u64)it->it_op;
543 /* pack the layout intent request */
544 layout = req_capsule_client_get(&req->rq_pill, &RMF_LAYOUT_INTENT);
545 /* LAYOUT_INTENT_ACCESS is generic, specific operation will be
546 * set for replication */
547 layout->li_opc = LAYOUT_INTENT_ACCESS;
549 req_capsule_set_size(&req->rq_pill, &RMF_DLM_LVB, RCL_SERVER,
550 obd->u.cli.cl_max_mds_easize);
551 ptlrpc_request_set_replen(req);
555 static struct ptlrpc_request *
556 mdc_enqueue_pack(struct obd_export *exp, int lvb_len)
558 struct ptlrpc_request *req;
562 req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_LDLM_ENQUEUE);
564 RETURN(ERR_PTR(-ENOMEM));
566 rc = ldlm_prep_enqueue_req(exp, req, NULL, 0);
568 ptlrpc_request_free(req);
572 req_capsule_set_size(&req->rq_pill, &RMF_DLM_LVB, RCL_SERVER, lvb_len);
573 ptlrpc_request_set_replen(req);
577 static int mdc_finish_enqueue(struct obd_export *exp,
578 struct ptlrpc_request *req,
579 struct ldlm_enqueue_info *einfo,
580 struct lookup_intent *it,
581 struct lustre_handle *lockh,
584 struct req_capsule *pill = &req->rq_pill;
585 struct ldlm_request *lockreq;
586 struct ldlm_reply *lockrep;
587 struct lustre_intent_data *intent = &it->d.lustre;
588 struct ldlm_lock *lock;
589 void *lvb_data = NULL;
594 /* Similarly, if we're going to replay this request, we don't want to
595 * actually get a lock, just perform the intent. */
596 if (req->rq_transno || req->rq_replay) {
597 lockreq = req_capsule_client_get(pill, &RMF_DLM_REQ);
598 lockreq->lock_flags |= ldlm_flags_to_wire(LDLM_FL_INTENT_ONLY);
601 if (rc == ELDLM_LOCK_ABORTED) {
603 memset(lockh, 0, sizeof(*lockh));
605 } else { /* rc = 0 */
606 lock = ldlm_handle2lock(lockh);
607 LASSERT(lock != NULL);
609 /* If the server gave us back a different lock mode, we should
610 * fix up our variables. */
611 if (lock->l_req_mode != einfo->ei_mode) {
612 ldlm_lock_addref(lockh, lock->l_req_mode);
613 ldlm_lock_decref(lockh, einfo->ei_mode);
614 einfo->ei_mode = lock->l_req_mode;
619 lockrep = req_capsule_server_get(pill, &RMF_DLM_REP);
620 LASSERT(lockrep != NULL); /* checked by ldlm_cli_enqueue() */
622 intent->it_disposition = (int)lockrep->lock_policy_res1;
623 intent->it_status = (int)lockrep->lock_policy_res2;
624 intent->it_lock_mode = einfo->ei_mode;
625 intent->it_lock_handle = lockh->cookie;
626 intent->it_data = req;
628 /* Technically speaking rq_transno must already be zero if
629 * it_status is in error, so the check is a bit redundant */
630 if ((!req->rq_transno || intent->it_status < 0) && req->rq_replay)
631 mdc_clear_replay_flag(req, intent->it_status);
633 /* If we're doing an IT_OPEN which did not result in an actual
634 * successful open, then we need to remove the bit which saves
635 * this request for unconditional replay.
637 * It's important that we do this first! Otherwise we might exit the
638 * function without doing so, and try to replay a failed create
640 if (it->it_op & IT_OPEN && req->rq_replay &&
641 (!it_disposition(it, DISP_OPEN_OPEN) ||intent->it_status != 0))
642 mdc_clear_replay_flag(req, intent->it_status);
644 DEBUG_REQ(D_RPCTRACE, req, "op: %d disposition: %x, status: %d",
645 it->it_op, intent->it_disposition, intent->it_status);
647 /* We know what to expect, so we do any byte flipping required here */
648 if (it->it_op & (IT_OPEN | IT_UNLINK | IT_LOOKUP | IT_GETATTR)) {
649 struct mdt_body *body;
651 body = req_capsule_server_get(pill, &RMF_MDT_BODY);
653 CERROR ("Can't swab mdt_body\n");
657 if (it_disposition(it, DISP_OPEN_OPEN) &&
658 !it_open_error(DISP_OPEN_OPEN, it)) {
660 * If this is a successful OPEN request, we need to set
661 * replay handler and data early, so that if replay
662 * happens immediately after swabbing below, new reply
663 * is swabbed by that handler correctly.
665 mdc_set_open_replay_data(NULL, NULL, it);
668 if ((body->valid & (OBD_MD_FLDIREA | OBD_MD_FLEASIZE)) != 0) {
671 mdc_update_max_ea_from_body(exp, body);
674 * The eadata is opaque; just check that it is there.
675 * Eventually, obd_unpackmd() will check the contents.
677 eadata = req_capsule_server_sized_get(pill, &RMF_MDT_MD,
682 /* save lvb data and length in case this is for layout
685 lvb_len = body->eadatasize;
688 * We save the reply LOV EA in case we have to replay a
689 * create for recovery. If we didn't allocate a large
690 * enough request buffer above we need to reallocate it
691 * here to hold the actual LOV EA.
693 * To not save LOV EA if request is not going to replay
694 * (for example error one).
696 if ((it->it_op & IT_OPEN) && req->rq_replay) {
698 if (req_capsule_get_size(pill, &RMF_EADATA,
701 mdc_realloc_openmsg(req, body);
703 req_capsule_shrink(pill, &RMF_EADATA,
707 req_capsule_set_size(pill, &RMF_EADATA,
711 lmm = req_capsule_client_get(pill, &RMF_EADATA);
713 memcpy(lmm, eadata, body->eadatasize);
717 if (body->valid & OBD_MD_FLRMTPERM) {
718 struct mdt_remote_perm *perm;
720 LASSERT(client_is_remote(exp));
721 perm = req_capsule_server_swab_get(pill, &RMF_ACL,
722 lustre_swab_mdt_remote_perm);
726 if (body->valid & OBD_MD_FLMDSCAPA) {
727 struct lustre_capa *capa, *p;
729 capa = req_capsule_server_get(pill, &RMF_CAPA1);
733 if (it->it_op & IT_OPEN) {
734 /* client fid capa will be checked in replay */
735 p = req_capsule_client_get(pill, &RMF_CAPA2);
740 if (body->valid & OBD_MD_FLOSSCAPA) {
741 struct lustre_capa *capa;
743 capa = req_capsule_server_get(pill, &RMF_CAPA2);
747 } else if (it->it_op & IT_LAYOUT) {
748 /* maybe the lock was granted right away and layout
749 * is packed into RMF_DLM_LVB of req */
750 lvb_len = req_capsule_get_size(pill, &RMF_DLM_LVB, RCL_SERVER);
752 lvb_data = req_capsule_server_sized_get(pill,
753 &RMF_DLM_LVB, lvb_len);
754 if (lvb_data == NULL)
759 /* fill in stripe data for layout lock */
760 lock = ldlm_handle2lock(lockh);
761 if (lock != NULL && ldlm_has_layout(lock) && lvb_data != NULL) {
764 LDLM_DEBUG(lock, "layout lock returned by: %s, lvb_len: %d\n",
765 ldlm_it2str(it->it_op), lvb_len);
767 OBD_ALLOC_LARGE(lmm, lvb_len);
772 memcpy(lmm, lvb_data, lvb_len);
774 /* install lvb_data */
775 lock_res_and_lock(lock);
776 if (lock->l_lvb_data == NULL) {
777 lock->l_lvb_type = LVB_T_LAYOUT;
778 lock->l_lvb_data = lmm;
779 lock->l_lvb_len = lvb_len;
782 unlock_res_and_lock(lock);
784 OBD_FREE_LARGE(lmm, lvb_len);
792 /* We always reserve enough space in the reply packet for a stripe MD, because
793 * we don't know in advance the file type. */
794 int mdc_enqueue(struct obd_export *exp, struct ldlm_enqueue_info *einfo,
795 struct lookup_intent *it, struct md_op_data *op_data,
796 struct lustre_handle *lockh, void *lmm, int lmmsize,
797 struct ptlrpc_request **reqp, __u64 extra_lock_flags)
799 struct obd_device *obddev = class_exp2obd(exp);
800 struct ptlrpc_request *req = NULL;
801 __u64 flags, saved_flags = extra_lock_flags;
803 struct ldlm_res_id res_id;
804 static const ldlm_policy_data_t lookup_policy =
805 { .l_inodebits = { MDS_INODELOCK_LOOKUP } };
806 static const ldlm_policy_data_t update_policy =
807 { .l_inodebits = { MDS_INODELOCK_UPDATE } };
808 static const ldlm_policy_data_t layout_policy =
809 { .l_inodebits = { MDS_INODELOCK_LAYOUT } };
810 static const ldlm_policy_data_t getxattr_policy = {
811 .l_inodebits = { MDS_INODELOCK_XATTR } };
812 ldlm_policy_data_t const *policy = &lookup_policy;
813 int generation, resends = 0;
814 struct ldlm_reply *lockrep;
815 enum lvb_type lvb_type = 0;
818 LASSERTF(!it || einfo->ei_type == LDLM_IBITS, "lock type %d\n",
821 fid_build_reg_res_name(&op_data->op_fid1, &res_id);
824 saved_flags |= LDLM_FL_HAS_INTENT;
825 if (it->it_op & (IT_UNLINK | IT_GETATTR | IT_READDIR))
826 policy = &update_policy;
827 else if (it->it_op & IT_LAYOUT)
828 policy = &layout_policy;
829 else if (it->it_op & (IT_GETXATTR | IT_SETXATTR))
830 policy = &getxattr_policy;
833 LASSERT(reqp == NULL);
835 generation = obddev->u.cli.cl_import->imp_generation;
839 /* The only way right now is FLOCK, in this case we hide flock
840 policy as lmm, but lmmsize is 0 */
841 LASSERT(lmm && lmmsize == 0);
842 LASSERTF(einfo->ei_type == LDLM_FLOCK, "lock type %d\n",
844 policy = (ldlm_policy_data_t *)lmm;
845 res_id.name[3] = LDLM_FLOCK;
846 } else if (it->it_op & IT_OPEN) {
847 req = mdc_intent_open_pack(exp, it, op_data, lmm, lmmsize,
849 policy = &update_policy;
850 einfo->ei_cbdata = NULL;
852 } else if (it->it_op & IT_UNLINK) {
853 req = mdc_intent_unlink_pack(exp, it, op_data);
854 } else if (it->it_op & (IT_GETATTR | IT_LOOKUP)) {
855 req = mdc_intent_getattr_pack(exp, it, op_data);
856 } else if (it->it_op & IT_READDIR) {
857 req = mdc_enqueue_pack(exp, 0);
858 } else if (it->it_op & IT_LAYOUT) {
859 if (!imp_connect_lvb_type(class_exp2cliimp(exp)))
861 req = mdc_intent_layout_pack(exp, it, op_data);
862 lvb_type = LVB_T_LAYOUT;
863 } else if (it->it_op & IT_GETXATTR) {
864 req = mdc_intent_getxattr_pack(exp, it, op_data);
871 RETURN(PTR_ERR(req));
873 if (req != NULL && it && it->it_op & IT_CREAT)
874 /* ask ptlrpc not to resend on EINPROGRESS since we have our own
876 req->rq_no_retry_einprogress = 1;
879 req->rq_generation_set = 1;
880 req->rq_import_generation = generation;
881 req->rq_sent = cfs_time_current_sec() + resends;
884 /* It is important to obtain rpc_lock first (if applicable), so that
885 * threads that are serialised with rpc_lock are not polluting our
886 * rpcs in flight counter. We do not do flock request limiting, though*/
888 mdc_get_rpc_lock(obddev->u.cli.cl_rpc_lock, it);
889 rc = mdc_enter_request(&obddev->u.cli);
891 mdc_put_rpc_lock(obddev->u.cli.cl_rpc_lock, it);
892 mdc_clear_replay_flag(req, 0);
893 ptlrpc_req_finished(req);
898 rc = ldlm_cli_enqueue(exp, &req, einfo, &res_id, policy, &flags, NULL,
899 0, lvb_type, lockh, 0);
901 /* For flock requests we immediatelly return without further
902 delay and let caller deal with the rest, since rest of
903 this function metadata processing makes no sense for flock
904 requests anyway. But in case of problem during comms with
905 Server (ETIMEDOUT) or any signal/kill attempt (EINTR), we
906 can not rely on caller and this mainly for F_UNLCKs
907 (explicits or automatically generated by Kernel to clean
908 current FLocks upon exit) that can't be trashed */
909 if (((rc == -EINTR) || (rc == -ETIMEDOUT)) &&
910 (einfo->ei_type == LDLM_FLOCK) &&
911 (einfo->ei_mode == LCK_NL))
916 mdc_exit_request(&obddev->u.cli);
917 mdc_put_rpc_lock(obddev->u.cli.cl_rpc_lock, it);
920 CERROR("ldlm_cli_enqueue: %d\n", rc);
921 mdc_clear_replay_flag(req, rc);
922 ptlrpc_req_finished(req);
926 lockrep = req_capsule_server_get(&req->rq_pill, &RMF_DLM_REP);
927 LASSERT(lockrep != NULL);
929 lockrep->lock_policy_res2 =
930 ptlrpc_status_ntoh(lockrep->lock_policy_res2);
932 /* Retry the create infinitely when we get -EINPROGRESS from
933 * server. This is required by the new quota design. */
934 if (it && it->it_op & IT_CREAT &&
935 (int)lockrep->lock_policy_res2 == -EINPROGRESS) {
936 mdc_clear_replay_flag(req, rc);
937 ptlrpc_req_finished(req);
940 CDEBUG(D_HA, "%s: resend:%d op:%d "DFID"/"DFID"\n",
941 obddev->obd_name, resends, it->it_op,
942 PFID(&op_data->op_fid1), PFID(&op_data->op_fid2));
944 if (generation == obddev->u.cli.cl_import->imp_generation) {
947 CDEBUG(D_HA, "resend cross eviction\n");
952 rc = mdc_finish_enqueue(exp, req, einfo, it, lockh, rc);
954 if (lustre_handle_is_used(lockh)) {
955 ldlm_lock_decref(lockh, einfo->ei_mode);
956 memset(lockh, 0, sizeof(*lockh));
958 ptlrpc_req_finished(req);
963 static int mdc_finish_intent_lock(struct obd_export *exp,
964 struct ptlrpc_request *request,
965 struct md_op_data *op_data,
966 struct lookup_intent *it,
967 struct lustre_handle *lockh)
969 struct lustre_handle old_lock;
970 struct mdt_body *mdt_body;
971 struct ldlm_lock *lock;
975 LASSERT(request != NULL);
976 LASSERT(request != LP_POISON);
977 LASSERT(request->rq_repmsg != LP_POISON);
979 if (it->it_op & IT_READDIR)
982 if (!it_disposition(it, DISP_IT_EXECD)) {
983 /* The server failed before it even started executing the
984 * intent, i.e. because it couldn't unpack the request. */
985 LASSERT(it->d.lustre.it_status != 0);
986 RETURN(it->d.lustre.it_status);
988 rc = it_open_error(DISP_IT_EXECD, it);
992 mdt_body = req_capsule_server_get(&request->rq_pill, &RMF_MDT_BODY);
993 LASSERT(mdt_body != NULL); /* mdc_enqueue checked */
995 /* If we were revalidating a fid/name pair, mark the intent in
996 * case we fail and get called again from lookup */
997 if (fid_is_sane(&op_data->op_fid2) &&
998 it->it_create_mode & M_CHECK_STALE &&
999 it->it_op != IT_GETATTR) {
1000 /* Also: did we find the same inode? */
1001 /* sever can return one of two fids:
1002 * op_fid2 - new allocated fid - if file is created.
1003 * op_fid3 - existent fid - if file only open.
1004 * op_fid3 is saved in lmv_intent_open */
1005 if ((!lu_fid_eq(&op_data->op_fid2, &mdt_body->fid1)) &&
1006 (!lu_fid_eq(&op_data->op_fid3, &mdt_body->fid1))) {
1007 CDEBUG(D_DENTRY, "Found stale data "DFID"("DFID")/"DFID
1008 "\n", PFID(&op_data->op_fid2),
1009 PFID(&op_data->op_fid2), PFID(&mdt_body->fid1));
1014 rc = it_open_error(DISP_LOOKUP_EXECD, it);
1018 /* keep requests around for the multiple phases of the call
1019 * this shows the DISP_XX must guarantee we make it into the call
1021 if (!it_disposition(it, DISP_ENQ_CREATE_REF) &&
1022 it_disposition(it, DISP_OPEN_CREATE) &&
1023 !it_open_error(DISP_OPEN_CREATE, it)) {
1024 it_set_disposition(it, DISP_ENQ_CREATE_REF);
1025 ptlrpc_request_addref(request); /* balanced in ll_create_node */
1027 if (!it_disposition(it, DISP_ENQ_OPEN_REF) &&
1028 it_disposition(it, DISP_OPEN_OPEN) &&
1029 !it_open_error(DISP_OPEN_OPEN, it)) {
1030 it_set_disposition(it, DISP_ENQ_OPEN_REF);
1031 ptlrpc_request_addref(request); /* balanced in ll_file_open */
1032 /* BUG 11546 - eviction in the middle of open rpc processing */
1033 OBD_FAIL_TIMEOUT(OBD_FAIL_MDC_ENQUEUE_PAUSE, obd_timeout);
1036 if (it->it_op & IT_CREAT) {
1037 /* XXX this belongs in ll_create_it */
1038 } else if (it->it_op == IT_OPEN) {
1039 LASSERT(!it_disposition(it, DISP_OPEN_CREATE));
1041 LASSERT(it->it_op & (IT_GETATTR | IT_LOOKUP | IT_LAYOUT));
1044 /* If we already have a matching lock, then cancel the new
1045 * one. We have to set the data here instead of in
1046 * mdc_enqueue, because we need to use the child's inode as
1047 * the l_ast_data to match, and that's not available until
1048 * intent_finish has performed the iget().) */
1049 lock = ldlm_handle2lock(lockh);
1051 ldlm_policy_data_t policy = lock->l_policy_data;
1052 LDLM_DEBUG(lock, "matching against this");
1054 LASSERTF(fid_res_name_eq(&mdt_body->fid1,
1055 &lock->l_resource->lr_name),
1056 "Lock res_id: "DLDLMRES", fid: "DFID"\n",
1057 PLDLMRES(lock->l_resource), PFID(&mdt_body->fid1));
1058 LDLM_LOCK_PUT(lock);
1060 memcpy(&old_lock, lockh, sizeof(*lockh));
1061 if (ldlm_lock_match(NULL, LDLM_FL_BLOCK_GRANTED, NULL,
1062 LDLM_IBITS, &policy, LCK_NL, &old_lock, 0)) {
1063 ldlm_lock_decref_and_cancel(lockh,
1064 it->d.lustre.it_lock_mode);
1065 memcpy(lockh, &old_lock, sizeof(old_lock));
1066 it->d.lustre.it_lock_handle = lockh->cookie;
1069 CDEBUG(D_DENTRY,"D_IT dentry %.*s intent: %s status %d disp %x rc %d\n",
1070 op_data->op_namelen, op_data->op_name, ldlm_it2str(it->it_op),
1071 it->d.lustre.it_status, it->d.lustre.it_disposition, rc);
1075 int mdc_revalidate_lock(struct obd_export *exp, struct lookup_intent *it,
1076 struct lu_fid *fid, __u64 *bits)
1078 /* We could just return 1 immediately, but since we should only
1079 * be called in revalidate_it if we already have a lock, let's
1081 struct ldlm_res_id res_id;
1082 struct lustre_handle lockh;
1083 ldlm_policy_data_t policy;
1087 if (it->d.lustre.it_lock_handle) {
1088 lockh.cookie = it->d.lustre.it_lock_handle;
1089 mode = ldlm_revalidate_lock_handle(&lockh, bits);
1091 fid_build_reg_res_name(fid, &res_id);
1092 switch (it->it_op) {
1094 /* File attributes are held under multiple bits:
1095 * nlink is under lookup lock, size and times are
1096 * under UPDATE lock and recently we've also got
1097 * a separate permissions lock for owner/group/acl that
1098 * were protected by lookup lock before.
1099 * Getattr must provide all of that information,
1100 * so we need to ensure we have all of those locks.
1101 * Unfortunately, if the bits are split across multiple
1102 * locks, there's no easy way to match all of them here,
1103 * so an extra RPC would be performed to fetch all
1104 * of those bits at once for now. */
1105 /* For new MDTs(> 2.4), UPDATE|PERM should be enough,
1106 * but for old MDTs (< 2.4), permission is covered
1107 * by LOOKUP lock, so it needs to match all bits here.*/
1108 policy.l_inodebits.bits = MDS_INODELOCK_UPDATE |
1109 MDS_INODELOCK_LOOKUP |
1113 policy.l_inodebits.bits = MDS_INODELOCK_UPDATE;
1116 policy.l_inodebits.bits = MDS_INODELOCK_LAYOUT;
1119 policy.l_inodebits.bits = MDS_INODELOCK_LOOKUP;
1123 mode = mdc_lock_match(exp, LDLM_FL_BLOCK_GRANTED, fid,
1124 LDLM_IBITS, &policy,
1125 LCK_CR | LCK_CW | LCK_PR | LCK_PW,
1130 it->d.lustre.it_lock_handle = lockh.cookie;
1131 it->d.lustre.it_lock_mode = mode;
1133 it->d.lustre.it_lock_handle = 0;
1134 it->d.lustre.it_lock_mode = 0;
1141 * This long block is all about fixing up the lock and request state
1142 * so that it is correct as of the moment _before_ the operation was
1143 * applied; that way, the VFS will think that everything is normal and
1144 * call Lustre's regular VFS methods.
1146 * If we're performing a creation, that means that unless the creation
1147 * failed with EEXIST, we should fake up a negative dentry.
1149 * For everything else, we want to lookup to succeed.
1151 * One additional note: if CREATE or OPEN succeeded, we add an extra
1152 * reference to the request because we need to keep it around until
1153 * ll_create/ll_open gets called.
1155 * The server will return to us, in it_disposition, an indication of
1156 * exactly what d.lustre.it_status refers to.
1158 * If DISP_OPEN_OPEN is set, then d.lustre.it_status refers to the open() call,
1159 * otherwise if DISP_OPEN_CREATE is set, then it status is the
1160 * creation failure mode. In either case, one of DISP_LOOKUP_NEG or
1161 * DISP_LOOKUP_POS will be set, indicating whether the child lookup
1164 * Else, if DISP_LOOKUP_EXECD then d.lustre.it_status is the rc of the
1167 int mdc_intent_lock(struct obd_export *exp, struct md_op_data *op_data,
1168 void *lmm, int lmmsize, struct lookup_intent *it,
1169 int lookup_flags, struct ptlrpc_request **reqp,
1170 ldlm_blocking_callback cb_blocking,
1171 __u64 extra_lock_flags)
1173 struct ldlm_enqueue_info einfo = {
1174 .ei_type = LDLM_IBITS,
1175 .ei_mode = it_to_lock_mode(it),
1176 .ei_cb_bl = cb_blocking,
1177 .ei_cb_cp = ldlm_completion_ast,
1179 struct lustre_handle lockh;
1184 CDEBUG(D_DLMTRACE, "(name: %.*s,"DFID") in obj "DFID
1185 ", intent: %s flags %#Lo\n", op_data->op_namelen,
1186 op_data->op_name, PFID(&op_data->op_fid2),
1187 PFID(&op_data->op_fid1), ldlm_it2str(it->it_op),
1191 if (fid_is_sane(&op_data->op_fid2) &&
1192 (it->it_op & (IT_LOOKUP | IT_GETATTR | IT_READDIR))) {
1193 /* We could just return 1 immediately, but since we should only
1194 * be called in revalidate_it if we already have a lock, let's
1196 it->d.lustre.it_lock_handle = 0;
1197 rc = mdc_revalidate_lock(exp, it, &op_data->op_fid2, NULL);
1198 /* Only return failure if it was not GETATTR by cfid
1199 (from inode_revalidate) */
1200 if (rc || op_data->op_namelen != 0)
1204 /* For case if upper layer did not alloc fid, do it now. */
1205 if (!fid_is_sane(&op_data->op_fid2) && it->it_op & IT_CREAT) {
1206 rc = mdc_fid_alloc(exp, &op_data->op_fid2, op_data);
1208 CERROR("Can't alloc new fid, rc %d\n", rc);
1212 rc = mdc_enqueue(exp, &einfo, it, op_data, &lockh, lmm, lmmsize, NULL,
1217 *reqp = it->d.lustre.it_data;
1218 rc = mdc_finish_intent_lock(exp, *reqp, op_data, it, &lockh);
1222 static int mdc_intent_getattr_async_interpret(const struct lu_env *env,
1223 struct ptlrpc_request *req,
1226 struct mdc_getattr_args *ga = args;
1227 struct obd_export *exp = ga->ga_exp;
1228 struct md_enqueue_info *minfo = ga->ga_minfo;
1229 struct ldlm_enqueue_info *einfo = ga->ga_einfo;
1230 struct lookup_intent *it;
1231 struct lustre_handle *lockh;
1232 struct obd_device *obddev;
1233 struct ldlm_reply *lockrep;
1234 __u64 flags = LDLM_FL_HAS_INTENT;
1238 lockh = &minfo->mi_lockh;
1240 obddev = class_exp2obd(exp);
1242 mdc_exit_request(&obddev->u.cli);
1243 if (OBD_FAIL_CHECK(OBD_FAIL_MDC_GETATTR_ENQUEUE))
1246 rc = ldlm_cli_enqueue_fini(exp, req, einfo->ei_type, 1, einfo->ei_mode,
1247 &flags, NULL, 0, lockh, rc);
1249 CERROR("ldlm_cli_enqueue_fini: %d\n", rc);
1250 mdc_clear_replay_flag(req, rc);
1254 lockrep = req_capsule_server_get(&req->rq_pill, &RMF_DLM_REP);
1255 LASSERT(lockrep != NULL);
1257 lockrep->lock_policy_res2 =
1258 ptlrpc_status_ntoh(lockrep->lock_policy_res2);
1260 rc = mdc_finish_enqueue(exp, req, einfo, it, lockh, rc);
1264 rc = mdc_finish_intent_lock(exp, req, &minfo->mi_data, it, lockh);
1268 OBD_FREE_PTR(einfo);
1269 minfo->mi_cb(req, minfo, rc);
1273 int mdc_intent_getattr_async(struct obd_export *exp,
1274 struct md_enqueue_info *minfo,
1275 struct ldlm_enqueue_info *einfo)
1277 struct md_op_data *op_data = &minfo->mi_data;
1278 struct lookup_intent *it = &minfo->mi_it;
1279 struct ptlrpc_request *req;
1280 struct mdc_getattr_args *ga;
1281 struct obd_device *obddev = class_exp2obd(exp);
1282 struct ldlm_res_id res_id;
1283 /*XXX: Both MDS_INODELOCK_LOOKUP and MDS_INODELOCK_UPDATE are needed
1284 * for statahead currently. Consider CMD in future, such two bits
1285 * maybe managed by different MDS, should be adjusted then. */
1286 ldlm_policy_data_t policy = {
1287 .l_inodebits = { MDS_INODELOCK_LOOKUP |
1288 MDS_INODELOCK_UPDATE }
1291 __u64 flags = LDLM_FL_HAS_INTENT;
1294 CDEBUG(D_DLMTRACE,"name: %.*s in inode "DFID", intent: %s flags %#Lo\n",
1295 op_data->op_namelen, op_data->op_name, PFID(&op_data->op_fid1),
1296 ldlm_it2str(it->it_op), it->it_flags);
1298 fid_build_reg_res_name(&op_data->op_fid1, &res_id);
1299 req = mdc_intent_getattr_pack(exp, it, op_data);
1301 RETURN(PTR_ERR(req));
1303 rc = mdc_enter_request(&obddev->u.cli);
1305 ptlrpc_req_finished(req);
1309 rc = ldlm_cli_enqueue(exp, &req, einfo, &res_id, &policy, &flags, NULL,
1310 0, LVB_T_NONE, &minfo->mi_lockh, 1);
1312 mdc_exit_request(&obddev->u.cli);
1313 ptlrpc_req_finished(req);
1317 CLASSERT(sizeof(*ga) <= sizeof(req->rq_async_args));
1318 ga = ptlrpc_req_async_args(req);
1320 ga->ga_minfo = minfo;
1321 ga->ga_einfo = einfo;
1323 req->rq_interpret_reply = mdc_intent_getattr_async_interpret;
1324 ptlrpcd_add_req(req, PDL_POLICY_LOCAL, -1);