4 * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
6 * This program is free software; you can redistribute it and/or modify
7 * it under the terms of the GNU General Public License version 2 only,
8 * as published by the Free Software Foundation.
10 * This program is distributed in the hope that it will be useful, but
11 * WITHOUT ANY WARRANTY; without even the implied warranty of
12 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
13 * General Public License version 2 for more details (a copy is included
14 * in the LICENSE file that accompanied this code).
16 * You should have received a copy of the GNU General Public License
17 * version 2 along with this program; If not, see
18 * http://www.sun.com/software/products/lustre/docs/GPLv2.pdf
20 * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara,
21 * CA 95054 USA or visit www.sun.com if you need additional information or
27 * Copyright (c) 2003, 2010, Oracle and/or its affiliates. All rights reserved.
28 * Use is subject to license terms.
30 * Copyright (c) 2011, 2012, Whamcloud, Inc.
33 * This file is part of Lustre, http://www.lustre.org/
34 * Lustre is a trademark of Sun Microsystems, Inc.
37 #define DEBUG_SUBSYSTEM S_MDC
40 # include <linux/module.h>
41 # include <linux/pagemap.h>
42 # include <linux/miscdevice.h>
43 # include <linux/init.h>
45 # include <liblustre.h>
48 #include <lustre_acl.h>
49 #include <obd_class.h>
50 #include <lustre_dlm.h>
51 /* fid_res_name_eq() */
52 #include <lustre_fid.h>
53 #include <lprocfs_status.h>
54 #include "mdc_internal.h"
56 struct mdc_getattr_args {
57 struct obd_export *ga_exp;
58 struct md_enqueue_info *ga_minfo;
59 struct ldlm_enqueue_info *ga_einfo;
62 int it_disposition(struct lookup_intent *it, int flag)
64 return it->d.lustre.it_disposition & flag;
66 EXPORT_SYMBOL(it_disposition);
68 void it_set_disposition(struct lookup_intent *it, int flag)
70 it->d.lustre.it_disposition |= flag;
72 EXPORT_SYMBOL(it_set_disposition);
74 void it_clear_disposition(struct lookup_intent *it, int flag)
76 it->d.lustre.it_disposition &= ~flag;
78 EXPORT_SYMBOL(it_clear_disposition);
80 int it_open_error(int phase, struct lookup_intent *it)
82 if (it_disposition(it, DISP_OPEN_OPEN)) {
83 if (phase >= DISP_OPEN_OPEN)
84 return it->d.lustre.it_status;
89 if (it_disposition(it, DISP_OPEN_CREATE)) {
90 if (phase >= DISP_OPEN_CREATE)
91 return it->d.lustre.it_status;
96 if (it_disposition(it, DISP_LOOKUP_EXECD)) {
97 if (phase >= DISP_LOOKUP_EXECD)
98 return it->d.lustre.it_status;
103 if (it_disposition(it, DISP_IT_EXECD)) {
104 if (phase >= DISP_IT_EXECD)
105 return it->d.lustre.it_status;
109 CERROR("it disp: %X, status: %d\n", it->d.lustre.it_disposition,
110 it->d.lustre.it_status);
114 EXPORT_SYMBOL(it_open_error);
116 /* this must be called on a lockh that is known to have a referenced lock */
117 int mdc_set_lock_data(struct obd_export *exp, __u64 *lockh, void *data,
120 struct ldlm_lock *lock;
129 lock = ldlm_handle2lock((struct lustre_handle *)lockh);
131 LASSERT(lock != NULL);
132 lock_res_and_lock(lock);
134 if (lock->l_ast_data && lock->l_ast_data != data) {
135 struct inode *new_inode = data;
136 struct inode *old_inode = lock->l_ast_data;
137 LASSERTF(old_inode->i_state & I_FREEING,
138 "Found existing inode %p/%lu/%u state %lu in lock: "
139 "setting data to %p/%lu/%u\n", old_inode,
140 old_inode->i_ino, old_inode->i_generation,
142 new_inode, new_inode->i_ino, new_inode->i_generation);
145 lock->l_ast_data = data;
147 *bits = lock->l_policy_data.l_inodebits.bits;
149 unlock_res_and_lock(lock);
155 ldlm_mode_t mdc_lock_match(struct obd_export *exp, __u64 flags,
156 const struct lu_fid *fid, ldlm_type_t type,
157 ldlm_policy_data_t *policy, ldlm_mode_t mode,
158 struct lustre_handle *lockh)
160 struct ldlm_res_id res_id;
164 fid_build_reg_res_name(fid, &res_id);
165 rc = ldlm_lock_match(class_exp2obd(exp)->obd_namespace, flags,
166 &res_id, type, policy, mode, lockh, 0);
170 int mdc_cancel_unused(struct obd_export *exp,
171 const struct lu_fid *fid,
172 ldlm_policy_data_t *policy,
174 ldlm_cancel_flags_t flags,
177 struct ldlm_res_id res_id;
178 struct obd_device *obd = class_exp2obd(exp);
183 fid_build_reg_res_name(fid, &res_id);
184 rc = ldlm_cli_cancel_unused_resource(obd->obd_namespace, &res_id,
185 policy, mode, flags, opaque);
189 int mdc_change_cbdata(struct obd_export *exp,
190 const struct lu_fid *fid,
191 ldlm_iterator_t it, void *data)
193 struct ldlm_res_id res_id;
196 fid_build_reg_res_name(fid, &res_id);
197 ldlm_resource_iterate(class_exp2obd(exp)->obd_namespace,
204 /* find any ldlm lock of the inode in mdc
208 int mdc_find_cbdata(struct obd_export *exp,
209 const struct lu_fid *fid,
210 ldlm_iterator_t it, void *data)
212 struct ldlm_res_id res_id;
216 fid_build_reg_res_name((struct lu_fid*)fid, &res_id);
217 rc = ldlm_resource_iterate(class_exp2obd(exp)->obd_namespace, &res_id,
219 if (rc == LDLM_ITER_STOP)
221 else if (rc == LDLM_ITER_CONTINUE)
226 static inline void mdc_clear_replay_flag(struct ptlrpc_request *req, int rc)
228 /* Don't hold error requests for replay. */
229 if (req->rq_replay) {
230 spin_lock(&req->rq_lock);
232 spin_unlock(&req->rq_lock);
234 if (rc && req->rq_transno != 0) {
235 DEBUG_REQ(D_ERROR, req, "transno returned on error rc %d", rc);
240 /* Save a large LOV EA into the request buffer so that it is available
241 * for replay. We don't do this in the initial request because the
242 * original request doesn't need this buffer (at most it sends just the
243 * lov_mds_md) and it is a waste of RAM/bandwidth to send the empty
244 * buffer and may also be difficult to allocate and save a very large
245 * request buffer for each open. (bug 5707)
247 * OOM here may cause recovery failure if lmm is needed (only for the
248 * original open if the MDS crashed just when this client also OOM'd)
249 * but this is incredibly unlikely, and questionable whether the client
250 * could do MDS recovery under OOM anyways... */
251 static void mdc_realloc_openmsg(struct ptlrpc_request *req,
252 struct mdt_body *body)
256 /* FIXME: remove this explicit offset. */
257 rc = sptlrpc_cli_enlarge_reqbuf(req, DLM_INTENT_REC_OFF + 4,
260 CERROR("Can't enlarge segment %d size to %d\n",
261 DLM_INTENT_REC_OFF + 4, body->eadatasize);
262 body->valid &= ~OBD_MD_FLEASIZE;
263 body->eadatasize = 0;
267 static struct ptlrpc_request *mdc_intent_open_pack(struct obd_export *exp,
268 struct lookup_intent *it,
269 struct md_op_data *op_data,
270 void *lmm, int lmmsize,
273 struct ptlrpc_request *req;
274 struct obd_device *obddev = class_exp2obd(exp);
275 struct ldlm_intent *lit;
276 CFS_LIST_HEAD(cancels);
282 it->it_create_mode = (it->it_create_mode & ~S_IFMT) | S_IFREG;
284 /* XXX: openlock is not cancelled for cross-refs. */
285 /* If inode is known, cancel conflicting OPEN locks. */
286 if (fid_is_sane(&op_data->op_fid2)) {
287 if (it->it_flags & (FMODE_WRITE|MDS_OPEN_TRUNC))
290 else if (it->it_flags & FMODE_EXEC)
295 count = mdc_resource_get_unused(exp, &op_data->op_fid2,
300 /* If CREATE, cancel parent's UPDATE lock. */
301 if (it->it_op & IT_CREAT)
305 count += mdc_resource_get_unused(exp, &op_data->op_fid1,
307 MDS_INODELOCK_UPDATE);
309 req = ptlrpc_request_alloc(class_exp2cliimp(exp),
310 &RQF_LDLM_INTENT_OPEN);
312 ldlm_lock_list_put(&cancels, l_bl_ast, count);
313 RETURN(ERR_PTR(-ENOMEM));
316 /* parent capability */
317 mdc_set_capa_size(req, &RMF_CAPA1, op_data->op_capa1);
318 /* child capability, reserve the size according to parent capa, it will
319 * be filled after we get the reply */
320 mdc_set_capa_size(req, &RMF_CAPA2, op_data->op_capa1);
322 req_capsule_set_size(&req->rq_pill, &RMF_NAME, RCL_CLIENT,
323 op_data->op_namelen + 1);
324 req_capsule_set_size(&req->rq_pill, &RMF_EADATA, RCL_CLIENT,
325 max(lmmsize, obddev->u.cli.cl_default_mds_easize));
327 rc = ldlm_prep_enqueue_req(exp, req, &cancels, count);
329 ptlrpc_request_free(req);
333 spin_lock(&req->rq_lock);
334 req->rq_replay = req->rq_import->imp_replayable;
335 spin_unlock(&req->rq_lock);
337 /* pack the intent */
338 lit = req_capsule_client_get(&req->rq_pill, &RMF_LDLM_INTENT);
339 lit->opc = (__u64)it->it_op;
341 /* pack the intended request */
342 mdc_open_pack(req, op_data, it->it_create_mode, 0, it->it_flags, lmm,
345 /* for remote client, fetch remote perm for current user */
346 if (client_is_remote(exp))
347 req_capsule_set_size(&req->rq_pill, &RMF_ACL, RCL_SERVER,
348 sizeof(struct mdt_remote_perm));
349 ptlrpc_request_set_replen(req);
353 static struct ptlrpc_request *mdc_intent_unlink_pack(struct obd_export *exp,
354 struct lookup_intent *it,
355 struct md_op_data *op_data)
357 struct ptlrpc_request *req;
358 struct obd_device *obddev = class_exp2obd(exp);
359 struct ldlm_intent *lit;
363 req = ptlrpc_request_alloc(class_exp2cliimp(exp),
364 &RQF_LDLM_INTENT_UNLINK);
366 RETURN(ERR_PTR(-ENOMEM));
368 mdc_set_capa_size(req, &RMF_CAPA1, op_data->op_capa1);
369 req_capsule_set_size(&req->rq_pill, &RMF_NAME, RCL_CLIENT,
370 op_data->op_namelen + 1);
372 rc = ldlm_prep_enqueue_req(exp, req, NULL, 0);
374 ptlrpc_request_free(req);
378 /* pack the intent */
379 lit = req_capsule_client_get(&req->rq_pill, &RMF_LDLM_INTENT);
380 lit->opc = (__u64)it->it_op;
382 /* pack the intended request */
383 mdc_unlink_pack(req, op_data);
385 req_capsule_set_size(&req->rq_pill, &RMF_MDT_MD, RCL_SERVER,
386 obddev->u.cli.cl_max_mds_easize);
387 req_capsule_set_size(&req->rq_pill, &RMF_ACL, RCL_SERVER,
388 obddev->u.cli.cl_max_mds_cookiesize);
389 ptlrpc_request_set_replen(req);
393 static struct ptlrpc_request *mdc_intent_getattr_pack(struct obd_export *exp,
394 struct lookup_intent *it,
395 struct md_op_data *op_data)
397 struct ptlrpc_request *req;
398 struct obd_device *obddev = class_exp2obd(exp);
399 obd_valid valid = OBD_MD_FLGETATTR | OBD_MD_FLEASIZE |
400 OBD_MD_FLMODEASIZE | OBD_MD_FLDIREA |
401 OBD_MD_FLMDSCAPA | OBD_MD_MEA |
402 (client_is_remote(exp) ?
403 OBD_MD_FLRMTPERM : OBD_MD_FLACL);
404 struct ldlm_intent *lit;
408 req = ptlrpc_request_alloc(class_exp2cliimp(exp),
409 &RQF_LDLM_INTENT_GETATTR);
411 RETURN(ERR_PTR(-ENOMEM));
413 mdc_set_capa_size(req, &RMF_CAPA1, op_data->op_capa1);
414 req_capsule_set_size(&req->rq_pill, &RMF_NAME, RCL_CLIENT,
415 op_data->op_namelen + 1);
417 rc = ldlm_prep_enqueue_req(exp, req, NULL, 0);
419 ptlrpc_request_free(req);
423 /* pack the intent */
424 lit = req_capsule_client_get(&req->rq_pill, &RMF_LDLM_INTENT);
425 lit->opc = (__u64)it->it_op;
427 /* pack the intended request */
428 mdc_getattr_pack(req, valid, it->it_flags, op_data,
429 obddev->u.cli.cl_max_mds_easize);
431 req_capsule_set_size(&req->rq_pill, &RMF_MDT_MD, RCL_SERVER,
432 obddev->u.cli.cl_max_mds_easize);
433 if (client_is_remote(exp))
434 req_capsule_set_size(&req->rq_pill, &RMF_ACL, RCL_SERVER,
435 sizeof(struct mdt_remote_perm));
436 ptlrpc_request_set_replen(req);
440 static struct ptlrpc_request *ldlm_enqueue_pack(struct obd_export *exp)
442 struct ptlrpc_request *req;
446 req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_LDLM_ENQUEUE);
448 RETURN(ERR_PTR(-ENOMEM));
450 rc = ldlm_prep_enqueue_req(exp, req, NULL, 0);
452 ptlrpc_request_free(req);
456 ptlrpc_request_set_replen(req);
460 static int mdc_finish_enqueue(struct obd_export *exp,
461 struct ptlrpc_request *req,
462 struct ldlm_enqueue_info *einfo,
463 struct lookup_intent *it,
464 struct lustre_handle *lockh,
467 struct req_capsule *pill = &req->rq_pill;
468 struct ldlm_request *lockreq;
469 struct ldlm_reply *lockrep;
470 struct lustre_intent_data *intent = &it->d.lustre;
474 /* Similarly, if we're going to replay this request, we don't want to
475 * actually get a lock, just perform the intent. */
476 if (req->rq_transno || req->rq_replay) {
477 lockreq = req_capsule_client_get(pill, &RMF_DLM_REQ);
478 lockreq->lock_flags |= ldlm_flags_to_wire(LDLM_FL_INTENT_ONLY);
481 if (rc == ELDLM_LOCK_ABORTED) {
483 memset(lockh, 0, sizeof(*lockh));
485 } else { /* rc = 0 */
486 struct ldlm_lock *lock = ldlm_handle2lock(lockh);
489 /* If the server gave us back a different lock mode, we should
490 * fix up our variables. */
491 if (lock->l_req_mode != einfo->ei_mode) {
492 ldlm_lock_addref(lockh, lock->l_req_mode);
493 ldlm_lock_decref(lockh, einfo->ei_mode);
494 einfo->ei_mode = lock->l_req_mode;
499 lockrep = req_capsule_server_get(pill, &RMF_DLM_REP);
500 LASSERT(lockrep != NULL); /* checked by ldlm_cli_enqueue() */
502 intent->it_disposition = (int)lockrep->lock_policy_res1;
503 intent->it_status = (int)lockrep->lock_policy_res2;
504 intent->it_lock_mode = einfo->ei_mode;
505 intent->it_lock_handle = lockh->cookie;
506 intent->it_data = req;
508 /* Technically speaking rq_transno must already be zero if
509 * it_status is in error, so the check is a bit redundant */
510 if ((!req->rq_transno || intent->it_status < 0) && req->rq_replay)
511 mdc_clear_replay_flag(req, intent->it_status);
513 /* If we're doing an IT_OPEN which did not result in an actual
514 * successful open, then we need to remove the bit which saves
515 * this request for unconditional replay.
517 * It's important that we do this first! Otherwise we might exit the
518 * function without doing so, and try to replay a failed create
520 if (it->it_op & IT_OPEN && req->rq_replay &&
521 (!it_disposition(it, DISP_OPEN_OPEN) ||intent->it_status != 0))
522 mdc_clear_replay_flag(req, intent->it_status);
524 DEBUG_REQ(D_RPCTRACE, req, "op: %d disposition: %x, status: %d",
525 it->it_op, intent->it_disposition, intent->it_status);
527 /* We know what to expect, so we do any byte flipping required here */
528 if (it->it_op & (IT_OPEN | IT_UNLINK | IT_LOOKUP | IT_GETATTR)) {
529 struct mdt_body *body;
531 body = req_capsule_server_get(pill, &RMF_MDT_BODY);
533 CERROR ("Can't swab mdt_body\n");
537 if (it_disposition(it, DISP_OPEN_OPEN) &&
538 !it_open_error(DISP_OPEN_OPEN, it)) {
540 * If this is a successful OPEN request, we need to set
541 * replay handler and data early, so that if replay
542 * happens immediately after swabbing below, new reply
543 * is swabbed by that handler correctly.
545 mdc_set_open_replay_data(NULL, NULL, req);
548 /* TODO: make sure LAYOUT lock must be granted along with EA */
550 if ((body->valid & (OBD_MD_FLDIREA | OBD_MD_FLEASIZE)) != 0) {
553 mdc_update_max_ea_from_body(exp, body);
556 * The eadata is opaque; just check that it is there.
557 * Eventually, obd_unpackmd() will check the contents.
559 eadata = req_capsule_server_sized_get(pill, &RMF_MDT_MD,
565 * We save the reply LOV EA in case we have to replay a
566 * create for recovery. If we didn't allocate a large
567 * enough request buffer above we need to reallocate it
568 * here to hold the actual LOV EA.
570 * To not save LOV EA if request is not going to replay
571 * (for example error one).
573 if ((it->it_op & IT_OPEN) && req->rq_replay) {
575 if (req_capsule_get_size(pill, &RMF_EADATA,
578 mdc_realloc_openmsg(req, body);
580 req_capsule_shrink(pill, &RMF_EADATA,
584 req_capsule_set_size(pill, &RMF_EADATA,
588 lmm = req_capsule_client_get(pill, &RMF_EADATA);
590 memcpy(lmm, eadata, body->eadatasize);
594 if (body->valid & OBD_MD_FLRMTPERM) {
595 struct mdt_remote_perm *perm;
597 LASSERT(client_is_remote(exp));
598 perm = req_capsule_server_swab_get(pill, &RMF_ACL,
599 lustre_swab_mdt_remote_perm);
603 if (body->valid & OBD_MD_FLMDSCAPA) {
604 struct lustre_capa *capa, *p;
606 capa = req_capsule_server_get(pill, &RMF_CAPA1);
610 if (it->it_op & IT_OPEN) {
611 /* client fid capa will be checked in replay */
612 p = req_capsule_client_get(pill, &RMF_CAPA2);
617 if (body->valid & OBD_MD_FLOSSCAPA) {
618 struct lustre_capa *capa;
620 capa = req_capsule_server_get(pill, &RMF_CAPA2);
624 } else if (it->it_op & IT_LAYOUT) {
625 struct ldlm_lock *lock = ldlm_handle2lock(lockh);
627 if (lock != NULL && lock->l_lvb_data == NULL) {
630 /* maybe the lock was granted right away and layout
631 * is packed into RMF_DLM_LVB of req */
632 lvb_len = req_capsule_get_size(pill, &RMF_DLM_LVB,
638 lvb = req_capsule_server_get(pill,
645 OBD_ALLOC_LARGE(lmm, lvb_len);
650 memcpy(lmm, lvb, lvb_len);
652 /* install lvb_data */
653 lock_res_and_lock(lock);
654 LASSERT(lock->l_lvb_data == NULL);
655 lock->l_lvb_data = lmm;
656 lock->l_lvb_len = lvb_len;
657 unlock_res_and_lock(lock);
667 /* We always reserve enough space in the reply packet for a stripe MD, because
668 * we don't know in advance the file type. */
669 int mdc_enqueue(struct obd_export *exp, struct ldlm_enqueue_info *einfo,
670 struct lookup_intent *it, struct md_op_data *op_data,
671 struct lustre_handle *lockh, void *lmm, int lmmsize,
672 struct ptlrpc_request **reqp, __u64 extra_lock_flags)
674 struct obd_device *obddev = class_exp2obd(exp);
675 struct ptlrpc_request *req = NULL;
676 __u64 flags, saved_flags = extra_lock_flags;
678 struct ldlm_res_id res_id;
679 static const ldlm_policy_data_t lookup_policy =
680 { .l_inodebits = { MDS_INODELOCK_LOOKUP } };
681 static const ldlm_policy_data_t update_policy =
682 { .l_inodebits = { MDS_INODELOCK_UPDATE } };
683 static const ldlm_policy_data_t layout_policy =
684 { .l_inodebits = { MDS_INODELOCK_LAYOUT } };
685 ldlm_policy_data_t const *policy = &lookup_policy;
686 int generation, resends = 0;
687 struct ldlm_reply *lockrep;
690 LASSERTF(!it || einfo->ei_type == LDLM_IBITS, "lock type %d\n",
693 fid_build_reg_res_name(&op_data->op_fid1, &res_id);
696 saved_flags |= LDLM_FL_HAS_INTENT;
697 if (it->it_op & (IT_UNLINK | IT_GETATTR | IT_READDIR))
698 policy = &update_policy;
699 else if (it->it_op & IT_LAYOUT)
700 policy = &layout_policy;
703 LASSERT(reqp == NULL);
705 generation = obddev->u.cli.cl_import->imp_generation;
709 /* The only way right now is FLOCK, in this case we hide flock
710 policy as lmm, but lmmsize is 0 */
711 LASSERT(lmm && lmmsize == 0);
712 LASSERTF(einfo->ei_type == LDLM_FLOCK, "lock type %d\n",
714 policy = (ldlm_policy_data_t *)lmm;
715 res_id.name[3] = LDLM_FLOCK;
716 } else if (it->it_op & IT_OPEN) {
717 req = mdc_intent_open_pack(exp, it, op_data, lmm, lmmsize,
719 policy = &update_policy;
720 einfo->ei_cbdata = NULL;
722 } else if (it->it_op & IT_UNLINK)
723 req = mdc_intent_unlink_pack(exp, it, op_data);
724 else if (it->it_op & (IT_GETATTR | IT_LOOKUP))
725 req = mdc_intent_getattr_pack(exp, it, op_data);
726 else if (it->it_op & (IT_READDIR | IT_LAYOUT))
727 req = ldlm_enqueue_pack(exp);
734 RETURN(PTR_ERR(req));
736 if (req != NULL && it && it->it_op & IT_CREAT)
737 /* ask ptlrpc not to resend on EINPROGRESS since we have our own
739 req->rq_no_retry_einprogress = 1;
742 req->rq_generation_set = 1;
743 req->rq_import_generation = generation;
744 req->rq_sent = cfs_time_current_sec() + resends;
747 /* It is important to obtain rpc_lock first (if applicable), so that
748 * threads that are serialised with rpc_lock are not polluting our
749 * rpcs in flight counter. We do not do flock request limiting, though*/
751 mdc_get_rpc_lock(obddev->u.cli.cl_rpc_lock, it);
752 rc = mdc_enter_request(&obddev->u.cli);
754 mdc_put_rpc_lock(obddev->u.cli.cl_rpc_lock, it);
755 mdc_clear_replay_flag(req, 0);
756 ptlrpc_req_finished(req);
761 rc = ldlm_cli_enqueue(exp, &req, einfo, &res_id, policy, &flags, NULL,
764 /* For flock requests we immediatelly return without further
765 delay and let caller deal with the rest, since rest of
766 this function metadata processing makes no sense for flock
771 mdc_exit_request(&obddev->u.cli);
772 mdc_put_rpc_lock(obddev->u.cli.cl_rpc_lock, it);
775 CERROR("ldlm_cli_enqueue: %d\n", rc);
776 mdc_clear_replay_flag(req, rc);
777 ptlrpc_req_finished(req);
781 lockrep = req_capsule_server_get(&req->rq_pill, &RMF_DLM_REP);
782 LASSERT(lockrep != NULL);
784 /* Retry the create infinitely when we get -EINPROGRESS from
785 * server. This is required by the new quota design. */
786 if (it && it->it_op & IT_CREAT &&
787 (int)lockrep->lock_policy_res2 == -EINPROGRESS) {
788 mdc_clear_replay_flag(req, rc);
789 ptlrpc_req_finished(req);
792 CDEBUG(D_HA, "%s: resend:%d op:%d "DFID"/"DFID"\n",
793 obddev->obd_name, resends, it->it_op,
794 PFID(&op_data->op_fid1), PFID(&op_data->op_fid2));
796 if (generation == obddev->u.cli.cl_import->imp_generation) {
799 CDEBUG(D_HA, "resend cross eviction\n");
804 rc = mdc_finish_enqueue(exp, req, einfo, it, lockh, rc);
809 static int mdc_finish_intent_lock(struct obd_export *exp,
810 struct ptlrpc_request *request,
811 struct md_op_data *op_data,
812 struct lookup_intent *it,
813 struct lustre_handle *lockh)
815 struct lustre_handle old_lock;
816 struct mdt_body *mdt_body;
817 struct ldlm_lock *lock;
821 LASSERT(request != NULL);
822 LASSERT(request != LP_POISON);
823 LASSERT(request->rq_repmsg != LP_POISON);
825 if (!it_disposition(it, DISP_IT_EXECD)) {
826 /* The server failed before it even started executing the
827 * intent, i.e. because it couldn't unpack the request. */
828 LASSERT(it->d.lustre.it_status != 0);
829 RETURN(it->d.lustre.it_status);
831 rc = it_open_error(DISP_IT_EXECD, it);
835 mdt_body = req_capsule_server_get(&request->rq_pill, &RMF_MDT_BODY);
836 LASSERT(mdt_body != NULL); /* mdc_enqueue checked */
838 /* If we were revalidating a fid/name pair, mark the intent in
839 * case we fail and get called again from lookup */
840 if (fid_is_sane(&op_data->op_fid2) &&
841 it->it_create_mode & M_CHECK_STALE &&
842 it->it_op != IT_GETATTR) {
843 it_set_disposition(it, DISP_ENQ_COMPLETE);
845 /* Also: did we find the same inode? */
846 /* sever can return one of two fids:
847 * op_fid2 - new allocated fid - if file is created.
848 * op_fid3 - existent fid - if file only open.
849 * op_fid3 is saved in lmv_intent_open */
850 if ((!lu_fid_eq(&op_data->op_fid2, &mdt_body->fid1)) &&
851 (!lu_fid_eq(&op_data->op_fid3, &mdt_body->fid1))) {
852 CDEBUG(D_DENTRY, "Found stale data "DFID"("DFID")/"DFID
853 "\n", PFID(&op_data->op_fid2),
854 PFID(&op_data->op_fid2), PFID(&mdt_body->fid1));
859 rc = it_open_error(DISP_LOOKUP_EXECD, it);
863 /* keep requests around for the multiple phases of the call
864 * this shows the DISP_XX must guarantee we make it into the call
866 if (!it_disposition(it, DISP_ENQ_CREATE_REF) &&
867 it_disposition(it, DISP_OPEN_CREATE) &&
868 !it_open_error(DISP_OPEN_CREATE, it)) {
869 it_set_disposition(it, DISP_ENQ_CREATE_REF);
870 ptlrpc_request_addref(request); /* balanced in ll_create_node */
872 if (!it_disposition(it, DISP_ENQ_OPEN_REF) &&
873 it_disposition(it, DISP_OPEN_OPEN) &&
874 !it_open_error(DISP_OPEN_OPEN, it)) {
875 it_set_disposition(it, DISP_ENQ_OPEN_REF);
876 ptlrpc_request_addref(request); /* balanced in ll_file_open */
877 /* BUG 11546 - eviction in the middle of open rpc processing */
878 OBD_FAIL_TIMEOUT(OBD_FAIL_MDC_ENQUEUE_PAUSE, obd_timeout);
881 if (it->it_op & IT_CREAT) {
882 /* XXX this belongs in ll_create_it */
883 } else if (it->it_op == IT_OPEN) {
884 LASSERT(!it_disposition(it, DISP_OPEN_CREATE));
886 LASSERT(it->it_op & (IT_GETATTR | IT_LOOKUP | IT_LAYOUT));
889 /* If we already have a matching lock, then cancel the new
890 * one. We have to set the data here instead of in
891 * mdc_enqueue, because we need to use the child's inode as
892 * the l_ast_data to match, and that's not available until
893 * intent_finish has performed the iget().) */
894 lock = ldlm_handle2lock(lockh);
896 ldlm_policy_data_t policy = lock->l_policy_data;
897 LDLM_DEBUG(lock, "matching against this");
899 LASSERTF(fid_res_name_eq(&mdt_body->fid1,
900 &lock->l_resource->lr_name),
901 "Lock res_id: %lu/%lu/%lu, fid: %lu/%lu/%lu.\n",
902 (unsigned long)lock->l_resource->lr_name.name[0],
903 (unsigned long)lock->l_resource->lr_name.name[1],
904 (unsigned long)lock->l_resource->lr_name.name[2],
905 (unsigned long)fid_seq(&mdt_body->fid1),
906 (unsigned long)fid_oid(&mdt_body->fid1),
907 (unsigned long)fid_ver(&mdt_body->fid1));
910 memcpy(&old_lock, lockh, sizeof(*lockh));
911 if (ldlm_lock_match(NULL, LDLM_FL_BLOCK_GRANTED, NULL,
912 LDLM_IBITS, &policy, LCK_NL, &old_lock, 0)) {
913 ldlm_lock_decref_and_cancel(lockh,
914 it->d.lustre.it_lock_mode);
915 memcpy(lockh, &old_lock, sizeof(old_lock));
916 it->d.lustre.it_lock_handle = lockh->cookie;
919 CDEBUG(D_DENTRY,"D_IT dentry %.*s intent: %s status %d disp %x rc %d\n",
920 op_data->op_namelen, op_data->op_name, ldlm_it2str(it->it_op),
921 it->d.lustre.it_status, it->d.lustre.it_disposition, rc);
925 int mdc_revalidate_lock(struct obd_export *exp, struct lookup_intent *it,
926 struct lu_fid *fid, __u64 *bits)
928 /* We could just return 1 immediately, but since we should only
929 * be called in revalidate_it if we already have a lock, let's
931 struct ldlm_res_id res_id;
932 struct lustre_handle lockh;
933 ldlm_policy_data_t policy;
937 if (it->d.lustre.it_lock_handle) {
938 lockh.cookie = it->d.lustre.it_lock_handle;
939 mode = ldlm_revalidate_lock_handle(&lockh, bits);
941 fid_build_reg_res_name(fid, &res_id);
944 policy.l_inodebits.bits = MDS_INODELOCK_UPDATE;
947 policy.l_inodebits.bits = MDS_INODELOCK_LAYOUT;
950 policy.l_inodebits.bits = MDS_INODELOCK_LOOKUP;
953 mode = ldlm_lock_match(exp->exp_obd->obd_namespace,
954 LDLM_FL_BLOCK_GRANTED, &res_id,
956 LCK_CR|LCK_CW|LCK_PR|LCK_PW, &lockh, 0);
960 it->d.lustre.it_lock_handle = lockh.cookie;
961 it->d.lustre.it_lock_mode = mode;
963 it->d.lustre.it_lock_handle = 0;
964 it->d.lustre.it_lock_mode = 0;
971 * This long block is all about fixing up the lock and request state
972 * so that it is correct as of the moment _before_ the operation was
973 * applied; that way, the VFS will think that everything is normal and
974 * call Lustre's regular VFS methods.
976 * If we're performing a creation, that means that unless the creation
977 * failed with EEXIST, we should fake up a negative dentry.
979 * For everything else, we want to lookup to succeed.
981 * One additional note: if CREATE or OPEN succeeded, we add an extra
982 * reference to the request because we need to keep it around until
983 * ll_create/ll_open gets called.
985 * The server will return to us, in it_disposition, an indication of
986 * exactly what d.lustre.it_status refers to.
988 * If DISP_OPEN_OPEN is set, then d.lustre.it_status refers to the open() call,
989 * otherwise if DISP_OPEN_CREATE is set, then it status is the
990 * creation failure mode. In either case, one of DISP_LOOKUP_NEG or
991 * DISP_LOOKUP_POS will be set, indicating whether the child lookup
994 * Else, if DISP_LOOKUP_EXECD then d.lustre.it_status is the rc of the
997 int mdc_intent_lock(struct obd_export *exp, struct md_op_data *op_data,
998 void *lmm, int lmmsize, struct lookup_intent *it,
999 int lookup_flags, struct ptlrpc_request **reqp,
1000 ldlm_blocking_callback cb_blocking,
1001 __u64 extra_lock_flags)
1003 struct lustre_handle lockh;
1008 CDEBUG(D_DLMTRACE, "(name: %.*s,"DFID") in obj "DFID
1009 ", intent: %s flags %#o\n", op_data->op_namelen,
1010 op_data->op_name, PFID(&op_data->op_fid2),
1011 PFID(&op_data->op_fid1), ldlm_it2str(it->it_op),
1015 if (fid_is_sane(&op_data->op_fid2) &&
1016 (it->it_op & (IT_LOOKUP | IT_GETATTR | IT_LAYOUT))) {
1017 /* We could just return 1 immediately, but since we should only
1018 * be called in revalidate_it if we already have a lock, let's
1020 it->d.lustre.it_lock_handle = 0;
1021 rc = mdc_revalidate_lock(exp, it, &op_data->op_fid2, NULL);
1022 /* Only return failure if it was not GETATTR by cfid
1023 (from inode_revalidate) */
1024 if (rc || op_data->op_namelen != 0)
1028 /* lookup_it may be called only after revalidate_it has run, because
1029 * revalidate_it cannot return errors, only zero. Returning zero causes
1030 * this call to lookup, which *can* return an error.
1032 * We only want to execute the request associated with the intent one
1033 * time, however, so don't send the request again. Instead, skip past
1034 * this and use the request from revalidate. In this case, revalidate
1035 * never dropped its reference, so the refcounts are all OK */
1036 if (!it_disposition(it, DISP_ENQ_COMPLETE)) {
1037 struct ldlm_enqueue_info einfo =
1038 { LDLM_IBITS, it_to_lock_mode(it), cb_blocking,
1039 ldlm_completion_ast, NULL, NULL, NULL };
1041 /* For case if upper layer did not alloc fid, do it now. */
1042 if (!fid_is_sane(&op_data->op_fid2) && it->it_op & IT_CREAT) {
1043 rc = mdc_fid_alloc(exp, &op_data->op_fid2, op_data);
1045 CERROR("Can't alloc new fid, rc %d\n", rc);
1049 rc = mdc_enqueue(exp, &einfo, it, op_data, &lockh,
1050 lmm, lmmsize, NULL, extra_lock_flags);
1053 } else if (!fid_is_sane(&op_data->op_fid2) ||
1054 !(it->it_create_mode & M_CHECK_STALE)) {
1055 /* DISP_ENQ_COMPLETE set means there is extra reference on
1056 * request referenced from this intent, saved for subsequent
1057 * lookup. This path is executed when we proceed to this
1058 * lookup, so we clear DISP_ENQ_COMPLETE */
1059 it_clear_disposition(it, DISP_ENQ_COMPLETE);
1061 *reqp = it->d.lustre.it_data;
1062 rc = mdc_finish_intent_lock(exp, *reqp, op_data, it, &lockh);
1066 static int mdc_intent_getattr_async_interpret(const struct lu_env *env,
1067 struct ptlrpc_request *req,
1070 struct mdc_getattr_args *ga = args;
1071 struct obd_export *exp = ga->ga_exp;
1072 struct md_enqueue_info *minfo = ga->ga_minfo;
1073 struct ldlm_enqueue_info *einfo = ga->ga_einfo;
1074 struct lookup_intent *it;
1075 struct lustre_handle *lockh;
1076 struct obd_device *obddev;
1077 __u64 flags = LDLM_FL_HAS_INTENT;
1081 lockh = &minfo->mi_lockh;
1083 obddev = class_exp2obd(exp);
1085 mdc_exit_request(&obddev->u.cli);
1086 if (OBD_FAIL_CHECK(OBD_FAIL_MDC_GETATTR_ENQUEUE))
1089 rc = ldlm_cli_enqueue_fini(exp, req, einfo->ei_type, 1, einfo->ei_mode,
1090 &flags, NULL, 0, lockh, rc);
1092 CERROR("ldlm_cli_enqueue_fini: %d\n", rc);
1093 mdc_clear_replay_flag(req, rc);
1097 rc = mdc_finish_enqueue(exp, req, einfo, it, lockh, rc);
1101 rc = mdc_finish_intent_lock(exp, req, &minfo->mi_data, it, lockh);
1105 OBD_FREE_PTR(einfo);
1106 minfo->mi_cb(req, minfo, rc);
1110 int mdc_intent_getattr_async(struct obd_export *exp,
1111 struct md_enqueue_info *minfo,
1112 struct ldlm_enqueue_info *einfo)
1114 struct md_op_data *op_data = &minfo->mi_data;
1115 struct lookup_intent *it = &minfo->mi_it;
1116 struct ptlrpc_request *req;
1117 struct mdc_getattr_args *ga;
1118 struct obd_device *obddev = class_exp2obd(exp);
1119 struct ldlm_res_id res_id;
1120 /*XXX: Both MDS_INODELOCK_LOOKUP and MDS_INODELOCK_UPDATE are needed
1121 * for statahead currently. Consider CMD in future, such two bits
1122 * maybe managed by different MDS, should be adjusted then. */
1123 ldlm_policy_data_t policy = {
1124 .l_inodebits = { MDS_INODELOCK_LOOKUP |
1125 MDS_INODELOCK_UPDATE }
1128 __u64 flags = LDLM_FL_HAS_INTENT;
1131 CDEBUG(D_DLMTRACE,"name: %.*s in inode "DFID", intent: %s flags %#o\n",
1132 op_data->op_namelen, op_data->op_name, PFID(&op_data->op_fid1),
1133 ldlm_it2str(it->it_op), it->it_flags);
1135 fid_build_reg_res_name(&op_data->op_fid1, &res_id);
1136 req = mdc_intent_getattr_pack(exp, it, op_data);
1140 rc = mdc_enter_request(&obddev->u.cli);
1142 ptlrpc_req_finished(req);
1146 rc = ldlm_cli_enqueue(exp, &req, einfo, &res_id, &policy, &flags, NULL,
1147 0, &minfo->mi_lockh, 1);
1149 mdc_exit_request(&obddev->u.cli);
1150 ptlrpc_req_finished(req);
1154 CLASSERT(sizeof(*ga) <= sizeof(req->rq_async_args));
1155 ga = ptlrpc_req_async_args(req);
1157 ga->ga_minfo = minfo;
1158 ga->ga_einfo = einfo;
1160 req->rq_interpret_reply = mdc_intent_getattr_async_interpret;
1161 ptlrpcd_add_req(req, PDL_POLICY_LOCAL, -1);