4 * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
6 * This program is free software; you can redistribute it and/or modify
7 * it under the terms of the GNU General Public License version 2 only,
8 * as published by the Free Software Foundation.
10 * This program is distributed in the hope that it will be useful, but
11 * WITHOUT ANY WARRANTY; without even the implied warranty of
12 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
13 * General Public License version 2 for more details (a copy is included
14 * in the LICENSE file that accompanied this code).
16 * You should have received a copy of the GNU General Public License
17 * version 2 along with this program; If not, see
18 * http://www.sun.com/software/products/lustre/docs/GPLv2.pdf
20 * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara,
21 * CA 95054 USA or visit www.sun.com if you need additional information or
27 * Copyright (c) 2003, 2010, Oracle and/or its affiliates. All rights reserved.
28 * Use is subject to license terms.
30 * Copyright (c) 2011, 2012, Whamcloud, Inc.
33 * This file is part of Lustre, http://www.lustre.org/
34 * Lustre is a trademark of Sun Microsystems, Inc.
37 #define DEBUG_SUBSYSTEM S_MDC
40 # include <linux/module.h>
41 # include <linux/pagemap.h>
42 # include <linux/miscdevice.h>
43 # include <linux/init.h>
45 # include <liblustre.h>
48 #include <lustre_acl.h>
49 #include <obd_class.h>
50 #include <lustre_dlm.h>
51 /* fid_res_name_eq() */
52 #include <lustre_fid.h>
53 #include <lprocfs_status.h>
54 #include "mdc_internal.h"
56 struct mdc_getattr_args {
57 struct obd_export *ga_exp;
58 struct md_enqueue_info *ga_minfo;
59 struct ldlm_enqueue_info *ga_einfo;
62 int it_disposition(struct lookup_intent *it, int flag)
64 return it->d.lustre.it_disposition & flag;
66 EXPORT_SYMBOL(it_disposition);
68 void it_set_disposition(struct lookup_intent *it, int flag)
70 it->d.lustre.it_disposition |= flag;
72 EXPORT_SYMBOL(it_set_disposition);
74 void it_clear_disposition(struct lookup_intent *it, int flag)
76 it->d.lustre.it_disposition &= ~flag;
78 EXPORT_SYMBOL(it_clear_disposition);
80 int it_open_error(int phase, struct lookup_intent *it)
82 if (it_disposition(it, DISP_OPEN_OPEN)) {
83 if (phase >= DISP_OPEN_OPEN)
84 return it->d.lustre.it_status;
89 if (it_disposition(it, DISP_OPEN_CREATE)) {
90 if (phase >= DISP_OPEN_CREATE)
91 return it->d.lustre.it_status;
96 if (it_disposition(it, DISP_LOOKUP_EXECD)) {
97 if (phase >= DISP_LOOKUP_EXECD)
98 return it->d.lustre.it_status;
103 if (it_disposition(it, DISP_IT_EXECD)) {
104 if (phase >= DISP_IT_EXECD)
105 return it->d.lustre.it_status;
109 CERROR("it disp: %X, status: %d\n", it->d.lustre.it_disposition,
110 it->d.lustre.it_status);
114 EXPORT_SYMBOL(it_open_error);
116 /* this must be called on a lockh that is known to have a referenced lock */
117 int mdc_set_lock_data(struct obd_export *exp, __u64 *lockh, void *data,
120 struct ldlm_lock *lock;
129 lock = ldlm_handle2lock((struct lustre_handle *)lockh);
131 LASSERT(lock != NULL);
132 lock_res_and_lock(lock);
134 if (lock->l_ast_data && lock->l_ast_data != data) {
135 struct inode *new_inode = data;
136 struct inode *old_inode = lock->l_ast_data;
137 LASSERTF(old_inode->i_state & I_FREEING,
138 "Found existing inode %p/%lu/%u state %lu in lock: "
139 "setting data to %p/%lu/%u\n", old_inode,
140 old_inode->i_ino, old_inode->i_generation,
142 new_inode, new_inode->i_ino, new_inode->i_generation);
145 lock->l_ast_data = data;
147 *bits = lock->l_policy_data.l_inodebits.bits;
149 unlock_res_and_lock(lock);
155 ldlm_mode_t mdc_lock_match(struct obd_export *exp, int flags,
156 const struct lu_fid *fid, ldlm_type_t type,
157 ldlm_policy_data_t *policy, ldlm_mode_t mode,
158 struct lustre_handle *lockh)
160 struct ldlm_res_id res_id;
164 fid_build_reg_res_name(fid, &res_id);
165 rc = ldlm_lock_match(class_exp2obd(exp)->obd_namespace, flags,
166 &res_id, type, policy, mode, lockh, 0);
170 int mdc_cancel_unused(struct obd_export *exp,
171 const struct lu_fid *fid,
172 ldlm_policy_data_t *policy,
174 ldlm_cancel_flags_t flags,
177 struct ldlm_res_id res_id;
178 struct obd_device *obd = class_exp2obd(exp);
183 fid_build_reg_res_name(fid, &res_id);
184 rc = ldlm_cli_cancel_unused_resource(obd->obd_namespace, &res_id,
185 policy, mode, flags, opaque);
189 int mdc_change_cbdata(struct obd_export *exp,
190 const struct lu_fid *fid,
191 ldlm_iterator_t it, void *data)
193 struct ldlm_res_id res_id;
196 fid_build_reg_res_name(fid, &res_id);
197 ldlm_resource_iterate(class_exp2obd(exp)->obd_namespace,
204 /* find any ldlm lock of the inode in mdc
208 int mdc_find_cbdata(struct obd_export *exp,
209 const struct lu_fid *fid,
210 ldlm_iterator_t it, void *data)
212 struct ldlm_res_id res_id;
216 fid_build_reg_res_name((struct lu_fid*)fid, &res_id);
217 rc = ldlm_resource_iterate(class_exp2obd(exp)->obd_namespace, &res_id,
219 if (rc == LDLM_ITER_STOP)
221 else if (rc == LDLM_ITER_CONTINUE)
226 static inline void mdc_clear_replay_flag(struct ptlrpc_request *req, int rc)
228 /* Don't hold error requests for replay. */
229 if (req->rq_replay) {
230 cfs_spin_lock(&req->rq_lock);
232 cfs_spin_unlock(&req->rq_lock);
234 if (rc && req->rq_transno != 0) {
235 DEBUG_REQ(D_ERROR, req, "transno returned on error rc %d", rc);
240 /* Save a large LOV EA into the request buffer so that it is available
241 * for replay. We don't do this in the initial request because the
242 * original request doesn't need this buffer (at most it sends just the
243 * lov_mds_md) and it is a waste of RAM/bandwidth to send the empty
244 * buffer and may also be difficult to allocate and save a very large
245 * request buffer for each open. (bug 5707)
247 * OOM here may cause recovery failure if lmm is needed (only for the
248 * original open if the MDS crashed just when this client also OOM'd)
249 * but this is incredibly unlikely, and questionable whether the client
250 * could do MDS recovery under OOM anyways... */
251 static void mdc_realloc_openmsg(struct ptlrpc_request *req,
252 struct mdt_body *body)
256 /* FIXME: remove this explicit offset. */
257 rc = sptlrpc_cli_enlarge_reqbuf(req, DLM_INTENT_REC_OFF + 4,
260 CERROR("Can't enlarge segment %d size to %d\n",
261 DLM_INTENT_REC_OFF + 4, body->eadatasize);
262 body->valid &= ~OBD_MD_FLEASIZE;
263 body->eadatasize = 0;
267 static struct ptlrpc_request *mdc_intent_open_pack(struct obd_export *exp,
268 struct lookup_intent *it,
269 struct md_op_data *op_data,
270 void *lmm, int lmmsize,
273 struct ptlrpc_request *req;
274 struct obd_device *obddev = class_exp2obd(exp);
275 struct ldlm_intent *lit;
276 CFS_LIST_HEAD(cancels);
282 it->it_create_mode = (it->it_create_mode & ~S_IFMT) | S_IFREG;
284 /* XXX: openlock is not cancelled for cross-refs. */
285 /* If inode is known, cancel conflicting OPEN locks. */
286 if (fid_is_sane(&op_data->op_fid2)) {
287 if (it->it_flags & (FMODE_WRITE|MDS_OPEN_TRUNC))
290 else if (it->it_flags & FMODE_EXEC)
295 count = mdc_resource_get_unused(exp, &op_data->op_fid2,
300 /* If CREATE, cancel parent's UPDATE lock. */
301 if (it->it_op & IT_CREAT)
305 count += mdc_resource_get_unused(exp, &op_data->op_fid1,
307 MDS_INODELOCK_UPDATE);
309 req = ptlrpc_request_alloc(class_exp2cliimp(exp),
310 &RQF_LDLM_INTENT_OPEN);
312 ldlm_lock_list_put(&cancels, l_bl_ast, count);
313 RETURN(ERR_PTR(-ENOMEM));
316 /* parent capability */
317 mdc_set_capa_size(req, &RMF_CAPA1, op_data->op_capa1);
318 /* child capability, reserve the size according to parent capa, it will
319 * be filled after we get the reply */
320 mdc_set_capa_size(req, &RMF_CAPA2, op_data->op_capa1);
322 req_capsule_set_size(&req->rq_pill, &RMF_NAME, RCL_CLIENT,
323 op_data->op_namelen + 1);
324 req_capsule_set_size(&req->rq_pill, &RMF_EADATA, RCL_CLIENT,
325 max(lmmsize, obddev->u.cli.cl_default_mds_easize));
327 rc = ldlm_prep_enqueue_req(exp, req, &cancels, count);
329 ptlrpc_request_free(req);
333 cfs_spin_lock(&req->rq_lock);
334 req->rq_replay = req->rq_import->imp_replayable;
335 cfs_spin_unlock(&req->rq_lock);
337 /* pack the intent */
338 lit = req_capsule_client_get(&req->rq_pill, &RMF_LDLM_INTENT);
339 lit->opc = (__u64)it->it_op;
341 /* pack the intended request */
342 mdc_open_pack(req, op_data, it->it_create_mode, 0, it->it_flags, lmm,
345 /* for remote client, fetch remote perm for current user */
346 if (client_is_remote(exp))
347 req_capsule_set_size(&req->rq_pill, &RMF_ACL, RCL_SERVER,
348 sizeof(struct mdt_remote_perm));
349 ptlrpc_request_set_replen(req);
353 static struct ptlrpc_request *mdc_intent_unlink_pack(struct obd_export *exp,
354 struct lookup_intent *it,
355 struct md_op_data *op_data)
357 struct ptlrpc_request *req;
358 struct obd_device *obddev = class_exp2obd(exp);
359 struct ldlm_intent *lit;
363 req = ptlrpc_request_alloc(class_exp2cliimp(exp),
364 &RQF_LDLM_INTENT_UNLINK);
366 RETURN(ERR_PTR(-ENOMEM));
368 mdc_set_capa_size(req, &RMF_CAPA1, op_data->op_capa1);
369 req_capsule_set_size(&req->rq_pill, &RMF_NAME, RCL_CLIENT,
370 op_data->op_namelen + 1);
372 rc = ldlm_prep_enqueue_req(exp, req, NULL, 0);
374 ptlrpc_request_free(req);
378 /* pack the intent */
379 lit = req_capsule_client_get(&req->rq_pill, &RMF_LDLM_INTENT);
380 lit->opc = (__u64)it->it_op;
382 /* pack the intended request */
383 mdc_unlink_pack(req, op_data);
385 req_capsule_set_size(&req->rq_pill, &RMF_MDT_MD, RCL_SERVER,
386 obddev->u.cli.cl_max_mds_easize);
387 req_capsule_set_size(&req->rq_pill, &RMF_ACL, RCL_SERVER,
388 obddev->u.cli.cl_max_mds_cookiesize);
389 ptlrpc_request_set_replen(req);
393 static struct ptlrpc_request *mdc_intent_getattr_pack(struct obd_export *exp,
394 struct lookup_intent *it,
395 struct md_op_data *op_data)
397 struct ptlrpc_request *req;
398 struct obd_device *obddev = class_exp2obd(exp);
399 obd_valid valid = OBD_MD_FLGETATTR | OBD_MD_FLEASIZE |
400 OBD_MD_FLMODEASIZE | OBD_MD_FLDIREA |
401 OBD_MD_FLMDSCAPA | OBD_MD_MEA |
402 (client_is_remote(exp) ?
403 OBD_MD_FLRMTPERM : OBD_MD_FLACL);
404 struct ldlm_intent *lit;
408 req = ptlrpc_request_alloc(class_exp2cliimp(exp),
409 &RQF_LDLM_INTENT_GETATTR);
411 RETURN(ERR_PTR(-ENOMEM));
413 mdc_set_capa_size(req, &RMF_CAPA1, op_data->op_capa1);
414 req_capsule_set_size(&req->rq_pill, &RMF_NAME, RCL_CLIENT,
415 op_data->op_namelen + 1);
417 rc = ldlm_prep_enqueue_req(exp, req, NULL, 0);
419 ptlrpc_request_free(req);
423 /* pack the intent */
424 lit = req_capsule_client_get(&req->rq_pill, &RMF_LDLM_INTENT);
425 lit->opc = (__u64)it->it_op;
427 /* pack the intended request */
428 mdc_getattr_pack(req, valid, it->it_flags, op_data,
429 obddev->u.cli.cl_max_mds_easize);
431 req_capsule_set_size(&req->rq_pill, &RMF_MDT_MD, RCL_SERVER,
432 obddev->u.cli.cl_max_mds_easize);
433 if (client_is_remote(exp))
434 req_capsule_set_size(&req->rq_pill, &RMF_ACL, RCL_SERVER,
435 sizeof(struct mdt_remote_perm));
436 ptlrpc_request_set_replen(req);
440 static struct ptlrpc_request *ldlm_enqueue_pack(struct obd_export *exp)
442 struct ptlrpc_request *req;
446 req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_LDLM_ENQUEUE);
448 RETURN(ERR_PTR(-ENOMEM));
450 rc = ldlm_prep_enqueue_req(exp, req, NULL, 0);
452 ptlrpc_request_free(req);
456 ptlrpc_request_set_replen(req);
460 static int mdc_finish_enqueue(struct obd_export *exp,
461 struct ptlrpc_request *req,
462 struct ldlm_enqueue_info *einfo,
463 struct lookup_intent *it,
464 struct lustre_handle *lockh,
467 struct req_capsule *pill = &req->rq_pill;
468 struct ldlm_request *lockreq;
469 struct ldlm_reply *lockrep;
471 struct lustre_intent_data *intent = &it->d.lustre;
475 /* Similarly, if we're going to replay this request, we don't want to
476 * actually get a lock, just perform the intent. */
477 if (req->rq_transno || req->rq_replay) {
478 lockreq = req_capsule_client_get(pill, &RMF_DLM_REQ);
479 lockreq->lock_flags |= LDLM_FL_INTENT_ONLY;
482 if (rc == ELDLM_LOCK_ABORTED) {
484 memset(lockh, 0, sizeof(*lockh));
486 } else { /* rc = 0 */
487 struct ldlm_lock *lock = ldlm_handle2lock(lockh);
490 /* If the server gave us back a different lock mode, we should
491 * fix up our variables. */
492 if (lock->l_req_mode != einfo->ei_mode) {
493 ldlm_lock_addref(lockh, lock->l_req_mode);
494 ldlm_lock_decref(lockh, einfo->ei_mode);
495 einfo->ei_mode = lock->l_req_mode;
497 bits = lock->l_policy_data.l_inodebits.bits;
501 lockrep = req_capsule_server_get(pill, &RMF_DLM_REP);
502 LASSERT(lockrep != NULL); /* checked by ldlm_cli_enqueue() */
504 intent->it_disposition = (int)lockrep->lock_policy_res1;
505 intent->it_status = (int)lockrep->lock_policy_res2;
506 intent->it_lock_mode = einfo->ei_mode;
507 intent->it_lock_handle = lockh->cookie;
508 intent->it_data = req;
510 if (intent->it_status < 0 && req->rq_replay)
511 mdc_clear_replay_flag(req, intent->it_status);
513 /* If we're doing an IT_OPEN which did not result in an actual
514 * successful open, then we need to remove the bit which saves
515 * this request for unconditional replay.
517 * It's important that we do this first! Otherwise we might exit the
518 * function without doing so, and try to replay a failed create
520 if (it->it_op & IT_OPEN && req->rq_replay &&
521 (!it_disposition(it, DISP_OPEN_OPEN) ||intent->it_status != 0))
522 mdc_clear_replay_flag(req, intent->it_status);
524 DEBUG_REQ(D_RPCTRACE, req, "op: %d disposition: %x, status: %d",
525 it->it_op, intent->it_disposition, intent->it_status);
527 /* We know what to expect, so we do any byte flipping required here */
528 if (it->it_op & (IT_OPEN | IT_UNLINK | IT_LOOKUP | IT_GETATTR)) {
529 struct mdt_body *body;
531 body = req_capsule_server_get(pill, &RMF_MDT_BODY);
533 CERROR ("Can't swab mdt_body\n");
537 if (it_disposition(it, DISP_OPEN_OPEN) &&
538 !it_open_error(DISP_OPEN_OPEN, it)) {
540 * If this is a successful OPEN request, we need to set
541 * replay handler and data early, so that if replay
542 * happens immediately after swabbing below, new reply
543 * is swabbed by that handler correctly.
545 mdc_set_open_replay_data(NULL, NULL, req);
548 /* TODO: make sure LAYOUT lock must be granted along with EA */
550 if ((body->valid & (OBD_MD_FLDIREA | OBD_MD_FLEASIZE)) != 0) {
553 mdc_update_max_ea_from_body(exp, body);
556 * The eadata is opaque; just check that it is there.
557 * Eventually, obd_unpackmd() will check the contents.
559 eadata = req_capsule_server_sized_get(pill, &RMF_MDT_MD,
565 * We save the reply LOV EA in case we have to replay a
566 * create for recovery. If we didn't allocate a large
567 * enough request buffer above we need to reallocate it
568 * here to hold the actual LOV EA.
570 * To not save LOV EA if request is not going to replay
571 * (for example error one).
573 if ((it->it_op & IT_OPEN) && req->rq_replay) {
575 if (req_capsule_get_size(pill, &RMF_EADATA,
578 mdc_realloc_openmsg(req, body);
580 req_capsule_shrink(pill, &RMF_EADATA,
584 req_capsule_set_size(pill, &RMF_EADATA,
588 lmm = req_capsule_client_get(pill, &RMF_EADATA);
590 memcpy(lmm, eadata, body->eadatasize);
594 if (body->valid & OBD_MD_FLRMTPERM) {
595 struct mdt_remote_perm *perm;
597 LASSERT(client_is_remote(exp));
598 perm = req_capsule_server_swab_get(pill, &RMF_ACL,
599 lustre_swab_mdt_remote_perm);
603 if (body->valid & OBD_MD_FLMDSCAPA) {
604 struct lustre_capa *capa, *p;
606 capa = req_capsule_server_get(pill, &RMF_CAPA1);
610 if (it->it_op & IT_OPEN) {
611 /* client fid capa will be checked in replay */
612 p = req_capsule_client_get(pill, &RMF_CAPA2);
617 if (body->valid & OBD_MD_FLOSSCAPA) {
618 struct lustre_capa *capa;
620 capa = req_capsule_server_get(pill, &RMF_CAPA2);
624 } else if (it->it_op & IT_LAYOUT) {
625 struct ldlm_lock *lock = ldlm_handle2lock(lockh);
627 if (lock != NULL && lock->l_lvb_data == NULL) {
630 /* maybe the lock was granted right away and layout
631 * is packed into RMF_DLM_LVB of req */
632 lvb_len = req_capsule_get_size(pill, &RMF_DLM_LVB,
638 lvb = req_capsule_server_get(pill,
645 OBD_ALLOC_LARGE(lmm, lvb_len);
650 memcpy(lmm, lvb, lvb_len);
652 /* install lvb_data */
653 lock_res_and_lock(lock);
654 LASSERT(lock->l_lvb_data == NULL);
655 lock->l_lvb_data = lmm;
656 lock->l_lvb_len = lvb_len;
657 unlock_res_and_lock(lock);
667 /* We always reserve enough space in the reply packet for a stripe MD, because
668 * we don't know in advance the file type. */
669 int mdc_enqueue(struct obd_export *exp, struct ldlm_enqueue_info *einfo,
670 struct lookup_intent *it, struct md_op_data *op_data,
671 struct lustre_handle *lockh, void *lmm, int lmmsize,
672 struct ptlrpc_request **reqp, int extra_lock_flags)
674 struct obd_device *obddev = class_exp2obd(exp);
675 struct ptlrpc_request *req = NULL;
676 int flags, saved_flags = extra_lock_flags;
678 struct ldlm_res_id res_id;
679 static const ldlm_policy_data_t lookup_policy =
680 { .l_inodebits = { MDS_INODELOCK_LOOKUP } };
681 static const ldlm_policy_data_t update_policy =
682 { .l_inodebits = { MDS_INODELOCK_UPDATE } };
683 static const ldlm_policy_data_t layout_policy =
684 { .l_inodebits = { MDS_INODELOCK_LAYOUT } };
685 ldlm_policy_data_t const *policy = &lookup_policy;
686 int generation, resends = 0;
687 struct ldlm_reply *lockrep;
690 LASSERTF(!it || einfo->ei_type == LDLM_IBITS, "lock type %d\n",
693 fid_build_reg_res_name(&op_data->op_fid1, &res_id);
696 saved_flags |= LDLM_FL_HAS_INTENT;
697 if (it->it_op & (IT_UNLINK | IT_GETATTR | IT_READDIR))
698 policy = &update_policy;
699 else if (it->it_op & IT_LAYOUT)
700 policy = &layout_policy;
703 LASSERT(reqp == NULL);
705 generation = obddev->u.cli.cl_import->imp_generation;
709 /* The only way right now is FLOCK, in this case we hide flock
710 policy as lmm, but lmmsize is 0 */
711 LASSERT(lmm && lmmsize == 0);
712 LASSERTF(einfo->ei_type == LDLM_FLOCK, "lock type %d\n",
714 policy = (ldlm_policy_data_t *)lmm;
715 res_id.name[3] = LDLM_FLOCK;
716 } else if (it->it_op & IT_OPEN) {
717 req = mdc_intent_open_pack(exp, it, op_data, lmm, lmmsize,
719 policy = &update_policy;
720 einfo->ei_cbdata = NULL;
722 } else if (it->it_op & IT_UNLINK)
723 req = mdc_intent_unlink_pack(exp, it, op_data);
724 else if (it->it_op & (IT_GETATTR | IT_LOOKUP))
725 req = mdc_intent_getattr_pack(exp, it, op_data);
726 else if (it->it_op & (IT_READDIR | IT_LAYOUT))
727 req = ldlm_enqueue_pack(exp);
734 RETURN(PTR_ERR(req));
737 req->rq_generation_set = 1;
738 req->rq_import_generation = generation;
739 req->rq_sent = cfs_time_current_sec() + resends;
742 /* It is important to obtain rpc_lock first (if applicable), so that
743 * threads that are serialised with rpc_lock are not polluting our
744 * rpcs in flight counter. We do not do flock request limiting, though*/
746 mdc_get_rpc_lock(obddev->u.cli.cl_rpc_lock, it);
747 rc = mdc_enter_request(&obddev->u.cli);
749 mdc_put_rpc_lock(obddev->u.cli.cl_rpc_lock, it);
750 mdc_clear_replay_flag(req, 0);
751 ptlrpc_req_finished(req);
756 rc = ldlm_cli_enqueue(exp, &req, einfo, &res_id, policy, &flags, NULL,
759 /* For flock requests we immediatelly return without further
760 delay and let caller deal with the rest, since rest of
761 this function metadata processing makes no sense for flock
766 mdc_exit_request(&obddev->u.cli);
767 mdc_put_rpc_lock(obddev->u.cli.cl_rpc_lock, it);
770 CERROR("ldlm_cli_enqueue: %d\n", rc);
771 mdc_clear_replay_flag(req, rc);
772 ptlrpc_req_finished(req);
776 lockrep = req_capsule_server_get(&req->rq_pill, &RMF_DLM_REP);
777 LASSERT(lockrep != NULL);
779 /* Retry the create infinitely when we get -EINPROGRESS from
780 * server. This is required by the new quota design. */
781 if (it && it->it_op & IT_CREAT &&
782 (int)lockrep->lock_policy_res2 == -EINPROGRESS) {
783 mdc_clear_replay_flag(req, rc);
784 ptlrpc_req_finished(req);
787 CDEBUG(D_HA, "%s: resend:%d op:%d "DFID"/"DFID"\n",
788 obddev->obd_name, resends, it->it_op,
789 PFID(&op_data->op_fid1), PFID(&op_data->op_fid2));
791 if (generation == obddev->u.cli.cl_import->imp_generation) {
794 CDEBUG(D_HA, "resned cross eviction\n");
799 rc = mdc_finish_enqueue(exp, req, einfo, it, lockh, rc);
804 static int mdc_finish_intent_lock(struct obd_export *exp,
805 struct ptlrpc_request *request,
806 struct md_op_data *op_data,
807 struct lookup_intent *it,
808 struct lustre_handle *lockh)
810 struct lustre_handle old_lock;
811 struct mdt_body *mdt_body;
812 struct ldlm_lock *lock;
816 LASSERT(request != NULL);
817 LASSERT(request != LP_POISON);
818 LASSERT(request->rq_repmsg != LP_POISON);
820 if (!it_disposition(it, DISP_IT_EXECD)) {
821 /* The server failed before it even started executing the
822 * intent, i.e. because it couldn't unpack the request. */
823 LASSERT(it->d.lustre.it_status != 0);
824 RETURN(it->d.lustre.it_status);
826 rc = it_open_error(DISP_IT_EXECD, it);
830 mdt_body = req_capsule_server_get(&request->rq_pill, &RMF_MDT_BODY);
831 LASSERT(mdt_body != NULL); /* mdc_enqueue checked */
833 /* If we were revalidating a fid/name pair, mark the intent in
834 * case we fail and get called again from lookup */
835 if (fid_is_sane(&op_data->op_fid2) &&
836 it->it_create_mode & M_CHECK_STALE &&
837 it->it_op != IT_GETATTR) {
838 it_set_disposition(it, DISP_ENQ_COMPLETE);
840 /* Also: did we find the same inode? */
841 /* sever can return one of two fids:
842 * op_fid2 - new allocated fid - if file is created.
843 * op_fid3 - existent fid - if file only open.
844 * op_fid3 is saved in lmv_intent_open */
845 if ((!lu_fid_eq(&op_data->op_fid2, &mdt_body->fid1)) &&
846 (!lu_fid_eq(&op_data->op_fid3, &mdt_body->fid1))) {
847 CDEBUG(D_DENTRY, "Found stale data "DFID"("DFID")/"DFID
848 "\n", PFID(&op_data->op_fid2),
849 PFID(&op_data->op_fid2), PFID(&mdt_body->fid1));
854 rc = it_open_error(DISP_LOOKUP_EXECD, it);
858 /* keep requests around for the multiple phases of the call
859 * this shows the DISP_XX must guarantee we make it into the call
861 if (!it_disposition(it, DISP_ENQ_CREATE_REF) &&
862 it_disposition(it, DISP_OPEN_CREATE) &&
863 !it_open_error(DISP_OPEN_CREATE, it)) {
864 it_set_disposition(it, DISP_ENQ_CREATE_REF);
865 ptlrpc_request_addref(request); /* balanced in ll_create_node */
867 if (!it_disposition(it, DISP_ENQ_OPEN_REF) &&
868 it_disposition(it, DISP_OPEN_OPEN) &&
869 !it_open_error(DISP_OPEN_OPEN, it)) {
870 it_set_disposition(it, DISP_ENQ_OPEN_REF);
871 ptlrpc_request_addref(request); /* balanced in ll_file_open */
872 /* BUG 11546 - eviction in the middle of open rpc processing */
873 OBD_FAIL_TIMEOUT(OBD_FAIL_MDC_ENQUEUE_PAUSE, obd_timeout);
876 if (it->it_op & IT_CREAT) {
877 /* XXX this belongs in ll_create_it */
878 } else if (it->it_op == IT_OPEN) {
879 LASSERT(!it_disposition(it, DISP_OPEN_CREATE));
881 LASSERT(it->it_op & (IT_GETATTR | IT_LOOKUP | IT_LAYOUT));
884 /* If we already have a matching lock, then cancel the new
885 * one. We have to set the data here instead of in
886 * mdc_enqueue, because we need to use the child's inode as
887 * the l_ast_data to match, and that's not available until
888 * intent_finish has performed the iget().) */
889 lock = ldlm_handle2lock(lockh);
891 ldlm_policy_data_t policy = lock->l_policy_data;
892 LDLM_DEBUG(lock, "matching against this");
894 LASSERTF(fid_res_name_eq(&mdt_body->fid1,
895 &lock->l_resource->lr_name),
896 "Lock res_id: %lu/%lu/%lu, fid: %lu/%lu/%lu.\n",
897 (unsigned long)lock->l_resource->lr_name.name[0],
898 (unsigned long)lock->l_resource->lr_name.name[1],
899 (unsigned long)lock->l_resource->lr_name.name[2],
900 (unsigned long)fid_seq(&mdt_body->fid1),
901 (unsigned long)fid_oid(&mdt_body->fid1),
902 (unsigned long)fid_ver(&mdt_body->fid1));
905 memcpy(&old_lock, lockh, sizeof(*lockh));
906 if (ldlm_lock_match(NULL, LDLM_FL_BLOCK_GRANTED, NULL,
907 LDLM_IBITS, &policy, LCK_NL, &old_lock, 0)) {
908 ldlm_lock_decref_and_cancel(lockh,
909 it->d.lustre.it_lock_mode);
910 memcpy(lockh, &old_lock, sizeof(old_lock));
911 it->d.lustre.it_lock_handle = lockh->cookie;
914 CDEBUG(D_DENTRY,"D_IT dentry %.*s intent: %s status %d disp %x rc %d\n",
915 op_data->op_namelen, op_data->op_name, ldlm_it2str(it->it_op),
916 it->d.lustre.it_status, it->d.lustre.it_disposition, rc);
920 int mdc_revalidate_lock(struct obd_export *exp, struct lookup_intent *it,
921 struct lu_fid *fid, __u64 *bits)
923 /* We could just return 1 immediately, but since we should only
924 * be called in revalidate_it if we already have a lock, let's
926 struct ldlm_res_id res_id;
927 struct lustre_handle lockh;
928 ldlm_policy_data_t policy;
932 if (it->d.lustre.it_lock_handle) {
933 lockh.cookie = it->d.lustre.it_lock_handle;
934 mode = ldlm_revalidate_lock_handle(&lockh, bits);
936 fid_build_reg_res_name(fid, &res_id);
939 policy.l_inodebits.bits = MDS_INODELOCK_UPDATE;
942 policy.l_inodebits.bits = MDS_INODELOCK_LAYOUT;
945 policy.l_inodebits.bits = MDS_INODELOCK_LOOKUP;
948 mode = ldlm_lock_match(exp->exp_obd->obd_namespace,
949 LDLM_FL_BLOCK_GRANTED, &res_id,
951 LCK_CR|LCK_CW|LCK_PR|LCK_PW, &lockh, 0);
955 it->d.lustre.it_lock_handle = lockh.cookie;
956 it->d.lustre.it_lock_mode = mode;
958 it->d.lustre.it_lock_handle = 0;
959 it->d.lustre.it_lock_mode = 0;
966 * This long block is all about fixing up the lock and request state
967 * so that it is correct as of the moment _before_ the operation was
968 * applied; that way, the VFS will think that everything is normal and
969 * call Lustre's regular VFS methods.
971 * If we're performing a creation, that means that unless the creation
972 * failed with EEXIST, we should fake up a negative dentry.
974 * For everything else, we want to lookup to succeed.
976 * One additional note: if CREATE or OPEN succeeded, we add an extra
977 * reference to the request because we need to keep it around until
978 * ll_create/ll_open gets called.
980 * The server will return to us, in it_disposition, an indication of
981 * exactly what d.lustre.it_status refers to.
983 * If DISP_OPEN_OPEN is set, then d.lustre.it_status refers to the open() call,
984 * otherwise if DISP_OPEN_CREATE is set, then it status is the
985 * creation failure mode. In either case, one of DISP_LOOKUP_NEG or
986 * DISP_LOOKUP_POS will be set, indicating whether the child lookup
989 * Else, if DISP_LOOKUP_EXECD then d.lustre.it_status is the rc of the
992 int mdc_intent_lock(struct obd_export *exp, struct md_op_data *op_data,
993 void *lmm, int lmmsize, struct lookup_intent *it,
994 int lookup_flags, struct ptlrpc_request **reqp,
995 ldlm_blocking_callback cb_blocking,
996 int extra_lock_flags)
998 struct lustre_handle lockh;
1003 CDEBUG(D_DLMTRACE, "(name: %.*s,"DFID") in obj "DFID
1004 ", intent: %s flags %#o\n", op_data->op_namelen,
1005 op_data->op_name, PFID(&op_data->op_fid2),
1006 PFID(&op_data->op_fid1), ldlm_it2str(it->it_op),
1010 if (fid_is_sane(&op_data->op_fid2) &&
1011 (it->it_op & (IT_LOOKUP | IT_GETATTR | IT_LAYOUT))) {
1012 /* We could just return 1 immediately, but since we should only
1013 * be called in revalidate_it if we already have a lock, let's
1015 it->d.lustre.it_lock_handle = 0;
1016 rc = mdc_revalidate_lock(exp, it, &op_data->op_fid2, NULL);
1017 /* Only return failure if it was not GETATTR by cfid
1018 (from inode_revalidate) */
1019 if (rc || op_data->op_namelen != 0)
1023 /* lookup_it may be called only after revalidate_it has run, because
1024 * revalidate_it cannot return errors, only zero. Returning zero causes
1025 * this call to lookup, which *can* return an error.
1027 * We only want to execute the request associated with the intent one
1028 * time, however, so don't send the request again. Instead, skip past
1029 * this and use the request from revalidate. In this case, revalidate
1030 * never dropped its reference, so the refcounts are all OK */
1031 if (!it_disposition(it, DISP_ENQ_COMPLETE)) {
1032 struct ldlm_enqueue_info einfo =
1033 { LDLM_IBITS, it_to_lock_mode(it), cb_blocking,
1034 ldlm_completion_ast, NULL, NULL, NULL };
1036 /* For case if upper layer did not alloc fid, do it now. */
1037 if (!fid_is_sane(&op_data->op_fid2) && it->it_op & IT_CREAT) {
1038 rc = mdc_fid_alloc(exp, &op_data->op_fid2, op_data);
1040 CERROR("Can't alloc new fid, rc %d\n", rc);
1044 rc = mdc_enqueue(exp, &einfo, it, op_data, &lockh,
1045 lmm, lmmsize, NULL, extra_lock_flags);
1048 } else if (!fid_is_sane(&op_data->op_fid2) ||
1049 !(it->it_create_mode & M_CHECK_STALE)) {
1050 /* DISP_ENQ_COMPLETE set means there is extra reference on
1051 * request referenced from this intent, saved for subsequent
1052 * lookup. This path is executed when we proceed to this
1053 * lookup, so we clear DISP_ENQ_COMPLETE */
1054 it_clear_disposition(it, DISP_ENQ_COMPLETE);
1056 *reqp = it->d.lustre.it_data;
1057 rc = mdc_finish_intent_lock(exp, *reqp, op_data, it, &lockh);
1061 static int mdc_intent_getattr_async_interpret(const struct lu_env *env,
1062 struct ptlrpc_request *req,
1065 struct mdc_getattr_args *ga = args;
1066 struct obd_export *exp = ga->ga_exp;
1067 struct md_enqueue_info *minfo = ga->ga_minfo;
1068 struct ldlm_enqueue_info *einfo = ga->ga_einfo;
1069 struct lookup_intent *it;
1070 struct lustre_handle *lockh;
1071 struct obd_device *obddev;
1072 int flags = LDLM_FL_HAS_INTENT;
1076 lockh = &minfo->mi_lockh;
1078 obddev = class_exp2obd(exp);
1080 mdc_exit_request(&obddev->u.cli);
1081 if (OBD_FAIL_CHECK(OBD_FAIL_MDC_GETATTR_ENQUEUE))
1084 rc = ldlm_cli_enqueue_fini(exp, req, einfo->ei_type, 1, einfo->ei_mode,
1085 &flags, NULL, 0, lockh, rc);
1087 CERROR("ldlm_cli_enqueue_fini: %d\n", rc);
1088 mdc_clear_replay_flag(req, rc);
1092 rc = mdc_finish_enqueue(exp, req, einfo, it, lockh, rc);
1096 rc = mdc_finish_intent_lock(exp, req, &minfo->mi_data, it, lockh);
1100 OBD_FREE_PTR(einfo);
1101 minfo->mi_cb(req, minfo, rc);
1105 int mdc_intent_getattr_async(struct obd_export *exp,
1106 struct md_enqueue_info *minfo,
1107 struct ldlm_enqueue_info *einfo)
1109 struct md_op_data *op_data = &minfo->mi_data;
1110 struct lookup_intent *it = &minfo->mi_it;
1111 struct ptlrpc_request *req;
1112 struct mdc_getattr_args *ga;
1113 struct obd_device *obddev = class_exp2obd(exp);
1114 struct ldlm_res_id res_id;
1115 /*XXX: Both MDS_INODELOCK_LOOKUP and MDS_INODELOCK_UPDATE are needed
1116 * for statahead currently. Consider CMD in future, such two bits
1117 * maybe managed by different MDS, should be adjusted then. */
1118 ldlm_policy_data_t policy = {
1119 .l_inodebits = { MDS_INODELOCK_LOOKUP |
1120 MDS_INODELOCK_UPDATE }
1123 int flags = LDLM_FL_HAS_INTENT;
1126 CDEBUG(D_DLMTRACE,"name: %.*s in inode "DFID", intent: %s flags %#o\n",
1127 op_data->op_namelen, op_data->op_name, PFID(&op_data->op_fid1),
1128 ldlm_it2str(it->it_op), it->it_flags);
1130 fid_build_reg_res_name(&op_data->op_fid1, &res_id);
1131 req = mdc_intent_getattr_pack(exp, it, op_data);
1135 rc = mdc_enter_request(&obddev->u.cli);
1137 ptlrpc_req_finished(req);
1141 rc = ldlm_cli_enqueue(exp, &req, einfo, &res_id, &policy, &flags, NULL,
1142 0, &minfo->mi_lockh, 1);
1144 mdc_exit_request(&obddev->u.cli);
1145 ptlrpc_req_finished(req);
1149 CLASSERT(sizeof(*ga) <= sizeof(req->rq_async_args));
1150 ga = ptlrpc_req_async_args(req);
1152 ga->ga_minfo = minfo;
1153 ga->ga_einfo = einfo;
1155 req->rq_interpret_reply = mdc_intent_getattr_async_interpret;
1156 ptlrpcd_add_req(req, PDL_POLICY_LOCAL, -1);