4 * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
6 * This program is free software; you can redistribute it and/or modify
7 * it under the terms of the GNU General Public License version 2 only,
8 * as published by the Free Software Foundation.
10 * This program is distributed in the hope that it will be useful, but
11 * WITHOUT ANY WARRANTY; without even the implied warranty of
12 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
13 * General Public License version 2 for more details (a copy is included
14 * in the LICENSE file that accompanied this code).
16 * You should have received a copy of the GNU General Public License
17 * version 2 along with this program; If not, see
18 * http://www.sun.com/software/products/lustre/docs/GPLv2.pdf
20 * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara,
21 * CA 95054 USA or visit www.sun.com if you need additional information or
27 * Copyright (c) 2003, 2010, Oracle and/or its affiliates. All rights reserved.
28 * Use is subject to license terms.
30 * Copyright (c) 2011, 2012, Whamcloud, Inc.
33 * This file is part of Lustre, http://www.lustre.org/
34 * Lustre is a trademark of Sun Microsystems, Inc.
37 #define DEBUG_SUBSYSTEM S_MDC
40 # include <linux/module.h>
41 # include <linux/pagemap.h>
42 # include <linux/miscdevice.h>
43 # include <linux/init.h>
45 # include <liblustre.h>
48 #include <lustre_acl.h>
49 #include <obd_class.h>
50 #include <lustre_dlm.h>
51 /* fid_res_name_eq() */
52 #include <lustre_fid.h>
53 #include <lprocfs_status.h>
54 #include "mdc_internal.h"
56 struct mdc_getattr_args {
57 struct obd_export *ga_exp;
58 struct md_enqueue_info *ga_minfo;
59 struct ldlm_enqueue_info *ga_einfo;
62 int it_disposition(struct lookup_intent *it, int flag)
64 return it->d.lustre.it_disposition & flag;
66 EXPORT_SYMBOL(it_disposition);
68 void it_set_disposition(struct lookup_intent *it, int flag)
70 it->d.lustre.it_disposition |= flag;
72 EXPORT_SYMBOL(it_set_disposition);
74 void it_clear_disposition(struct lookup_intent *it, int flag)
76 it->d.lustre.it_disposition &= ~flag;
78 EXPORT_SYMBOL(it_clear_disposition);
80 int it_open_error(int phase, struct lookup_intent *it)
82 if (it_disposition(it, DISP_OPEN_OPEN)) {
83 if (phase >= DISP_OPEN_OPEN)
84 return it->d.lustre.it_status;
89 if (it_disposition(it, DISP_OPEN_CREATE)) {
90 if (phase >= DISP_OPEN_CREATE)
91 return it->d.lustre.it_status;
96 if (it_disposition(it, DISP_LOOKUP_EXECD)) {
97 if (phase >= DISP_LOOKUP_EXECD)
98 return it->d.lustre.it_status;
103 if (it_disposition(it, DISP_IT_EXECD)) {
104 if (phase >= DISP_IT_EXECD)
105 return it->d.lustre.it_status;
109 CERROR("it disp: %X, status: %d\n", it->d.lustre.it_disposition,
110 it->d.lustre.it_status);
114 EXPORT_SYMBOL(it_open_error);
116 /* this must be called on a lockh that is known to have a referenced lock */
117 int mdc_set_lock_data(struct obd_export *exp, __u64 *lockh, void *data,
120 struct ldlm_lock *lock;
129 lock = ldlm_handle2lock((struct lustre_handle *)lockh);
131 LASSERT(lock != NULL);
132 lock_res_and_lock(lock);
134 if (lock->l_ast_data && lock->l_ast_data != data) {
135 struct inode *new_inode = data;
136 struct inode *old_inode = lock->l_ast_data;
137 LASSERTF(old_inode->i_state & I_FREEING,
138 "Found existing inode %p/%lu/%u state %lu in lock: "
139 "setting data to %p/%lu/%u\n", old_inode,
140 old_inode->i_ino, old_inode->i_generation,
142 new_inode, new_inode->i_ino, new_inode->i_generation);
145 lock->l_ast_data = data;
147 *bits = lock->l_policy_data.l_inodebits.bits;
149 unlock_res_and_lock(lock);
155 ldlm_mode_t mdc_lock_match(struct obd_export *exp, int flags,
156 const struct lu_fid *fid, ldlm_type_t type,
157 ldlm_policy_data_t *policy, ldlm_mode_t mode,
158 struct lustre_handle *lockh)
160 struct ldlm_res_id res_id;
164 fid_build_reg_res_name(fid, &res_id);
165 rc = ldlm_lock_match(class_exp2obd(exp)->obd_namespace, flags,
166 &res_id, type, policy, mode, lockh, 0);
170 int mdc_cancel_unused(struct obd_export *exp,
171 const struct lu_fid *fid,
172 ldlm_policy_data_t *policy,
174 ldlm_cancel_flags_t flags,
177 struct ldlm_res_id res_id;
178 struct obd_device *obd = class_exp2obd(exp);
183 fid_build_reg_res_name(fid, &res_id);
184 rc = ldlm_cli_cancel_unused_resource(obd->obd_namespace, &res_id,
185 policy, mode, flags, opaque);
189 int mdc_change_cbdata(struct obd_export *exp,
190 const struct lu_fid *fid,
191 ldlm_iterator_t it, void *data)
193 struct ldlm_res_id res_id;
196 fid_build_reg_res_name(fid, &res_id);
197 ldlm_resource_iterate(class_exp2obd(exp)->obd_namespace,
204 /* find any ldlm lock of the inode in mdc
208 int mdc_find_cbdata(struct obd_export *exp,
209 const struct lu_fid *fid,
210 ldlm_iterator_t it, void *data)
212 struct ldlm_res_id res_id;
216 fid_build_reg_res_name((struct lu_fid*)fid, &res_id);
217 rc = ldlm_resource_iterate(class_exp2obd(exp)->obd_namespace, &res_id,
219 if (rc == LDLM_ITER_STOP)
221 else if (rc == LDLM_ITER_CONTINUE)
226 static inline void mdc_clear_replay_flag(struct ptlrpc_request *req, int rc)
228 /* Don't hold error requests for replay. */
229 if (req->rq_replay) {
230 cfs_spin_lock(&req->rq_lock);
232 cfs_spin_unlock(&req->rq_lock);
234 if (rc && req->rq_transno != 0) {
235 DEBUG_REQ(D_ERROR, req, "transno returned on error rc %d", rc);
240 /* Save a large LOV EA into the request buffer so that it is available
241 * for replay. We don't do this in the initial request because the
242 * original request doesn't need this buffer (at most it sends just the
243 * lov_mds_md) and it is a waste of RAM/bandwidth to send the empty
244 * buffer and may also be difficult to allocate and save a very large
245 * request buffer for each open. (bug 5707)
247 * OOM here may cause recovery failure if lmm is needed (only for the
248 * original open if the MDS crashed just when this client also OOM'd)
249 * but this is incredibly unlikely, and questionable whether the client
250 * could do MDS recovery under OOM anyways... */
251 static void mdc_realloc_openmsg(struct ptlrpc_request *req,
252 struct mdt_body *body)
256 /* FIXME: remove this explicit offset. */
257 rc = sptlrpc_cli_enlarge_reqbuf(req, DLM_INTENT_REC_OFF + 4,
260 CERROR("Can't enlarge segment %d size to %d\n",
261 DLM_INTENT_REC_OFF + 4, body->eadatasize);
262 body->valid &= ~OBD_MD_FLEASIZE;
263 body->eadatasize = 0;
267 static struct ptlrpc_request *mdc_intent_open_pack(struct obd_export *exp,
268 struct lookup_intent *it,
269 struct md_op_data *op_data,
270 void *lmm, int lmmsize,
273 struct ptlrpc_request *req;
274 struct obd_device *obddev = class_exp2obd(exp);
275 struct ldlm_intent *lit;
276 CFS_LIST_HEAD(cancels);
282 it->it_create_mode = (it->it_create_mode & ~S_IFMT) | S_IFREG;
284 /* XXX: openlock is not cancelled for cross-refs. */
285 /* If inode is known, cancel conflicting OPEN locks. */
286 if (fid_is_sane(&op_data->op_fid2)) {
287 if (it->it_flags & (FMODE_WRITE|MDS_OPEN_TRUNC))
290 else if (it->it_flags & FMODE_EXEC)
295 count = mdc_resource_get_unused(exp, &op_data->op_fid2,
300 /* If CREATE, cancel parent's UPDATE lock. */
301 if (it->it_op & IT_CREAT)
305 count += mdc_resource_get_unused(exp, &op_data->op_fid1,
307 MDS_INODELOCK_UPDATE);
309 req = ptlrpc_request_alloc(class_exp2cliimp(exp),
310 &RQF_LDLM_INTENT_OPEN);
312 ldlm_lock_list_put(&cancels, l_bl_ast, count);
313 RETURN(ERR_PTR(-ENOMEM));
316 /* parent capability */
317 mdc_set_capa_size(req, &RMF_CAPA1, op_data->op_capa1);
318 /* child capability, reserve the size according to parent capa, it will
319 * be filled after we get the reply */
320 mdc_set_capa_size(req, &RMF_CAPA2, op_data->op_capa1);
322 req_capsule_set_size(&req->rq_pill, &RMF_NAME, RCL_CLIENT,
323 op_data->op_namelen + 1);
324 req_capsule_set_size(&req->rq_pill, &RMF_EADATA, RCL_CLIENT,
325 max(lmmsize, obddev->u.cli.cl_default_mds_easize));
327 rc = ldlm_prep_enqueue_req(exp, req, &cancels, count);
329 ptlrpc_request_free(req);
333 cfs_spin_lock(&req->rq_lock);
334 req->rq_replay = req->rq_import->imp_replayable;
335 cfs_spin_unlock(&req->rq_lock);
337 /* pack the intent */
338 lit = req_capsule_client_get(&req->rq_pill, &RMF_LDLM_INTENT);
339 lit->opc = (__u64)it->it_op;
341 /* pack the intended request */
342 mdc_open_pack(req, op_data, it->it_create_mode, 0, it->it_flags, lmm,
345 /* for remote client, fetch remote perm for current user */
346 if (client_is_remote(exp))
347 req_capsule_set_size(&req->rq_pill, &RMF_ACL, RCL_SERVER,
348 sizeof(struct mdt_remote_perm));
349 ptlrpc_request_set_replen(req);
353 static struct ptlrpc_request *mdc_intent_unlink_pack(struct obd_export *exp,
354 struct lookup_intent *it,
355 struct md_op_data *op_data)
357 struct ptlrpc_request *req;
358 struct obd_device *obddev = class_exp2obd(exp);
359 struct ldlm_intent *lit;
363 req = ptlrpc_request_alloc(class_exp2cliimp(exp),
364 &RQF_LDLM_INTENT_UNLINK);
366 RETURN(ERR_PTR(-ENOMEM));
368 mdc_set_capa_size(req, &RMF_CAPA1, op_data->op_capa1);
369 req_capsule_set_size(&req->rq_pill, &RMF_NAME, RCL_CLIENT,
370 op_data->op_namelen + 1);
372 rc = ldlm_prep_enqueue_req(exp, req, NULL, 0);
374 ptlrpc_request_free(req);
378 /* pack the intent */
379 lit = req_capsule_client_get(&req->rq_pill, &RMF_LDLM_INTENT);
380 lit->opc = (__u64)it->it_op;
382 /* pack the intended request */
383 mdc_unlink_pack(req, op_data);
385 req_capsule_set_size(&req->rq_pill, &RMF_MDT_MD, RCL_SERVER,
386 obddev->u.cli.cl_max_mds_easize);
387 req_capsule_set_size(&req->rq_pill, &RMF_ACL, RCL_SERVER,
388 obddev->u.cli.cl_max_mds_cookiesize);
389 ptlrpc_request_set_replen(req);
393 static struct ptlrpc_request *mdc_intent_getattr_pack(struct obd_export *exp,
394 struct lookup_intent *it,
395 struct md_op_data *op_data)
397 struct ptlrpc_request *req;
398 struct obd_device *obddev = class_exp2obd(exp);
399 obd_valid valid = OBD_MD_FLGETATTR | OBD_MD_FLEASIZE |
400 OBD_MD_FLMODEASIZE | OBD_MD_FLDIREA |
401 OBD_MD_FLMDSCAPA | OBD_MD_MEA |
402 (client_is_remote(exp) ?
403 OBD_MD_FLRMTPERM : OBD_MD_FLACL);
404 struct ldlm_intent *lit;
408 req = ptlrpc_request_alloc(class_exp2cliimp(exp),
409 &RQF_LDLM_INTENT_GETATTR);
411 RETURN(ERR_PTR(-ENOMEM));
413 mdc_set_capa_size(req, &RMF_CAPA1, op_data->op_capa1);
414 req_capsule_set_size(&req->rq_pill, &RMF_NAME, RCL_CLIENT,
415 op_data->op_namelen + 1);
417 rc = ldlm_prep_enqueue_req(exp, req, NULL, 0);
419 ptlrpc_request_free(req);
423 /* pack the intent */
424 lit = req_capsule_client_get(&req->rq_pill, &RMF_LDLM_INTENT);
425 lit->opc = (__u64)it->it_op;
427 /* pack the intended request */
428 mdc_getattr_pack(req, valid, it->it_flags, op_data,
429 obddev->u.cli.cl_max_mds_easize);
431 req_capsule_set_size(&req->rq_pill, &RMF_MDT_MD, RCL_SERVER,
432 obddev->u.cli.cl_max_mds_easize);
433 if (client_is_remote(exp))
434 req_capsule_set_size(&req->rq_pill, &RMF_ACL, RCL_SERVER,
435 sizeof(struct mdt_remote_perm));
436 ptlrpc_request_set_replen(req);
440 static struct ptlrpc_request *ldlm_enqueue_pack(struct obd_export *exp)
442 struct ptlrpc_request *req;
446 req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_LDLM_ENQUEUE);
448 RETURN(ERR_PTR(-ENOMEM));
450 rc = ldlm_prep_enqueue_req(exp, req, NULL, 0);
452 ptlrpc_request_free(req);
456 ptlrpc_request_set_replen(req);
460 static int mdc_finish_enqueue(struct obd_export *exp,
461 struct ptlrpc_request *req,
462 struct ldlm_enqueue_info *einfo,
463 struct lookup_intent *it,
464 struct lustre_handle *lockh,
467 struct req_capsule *pill = &req->rq_pill;
468 struct ldlm_request *lockreq;
469 struct ldlm_reply *lockrep;
470 struct lustre_intent_data *intent = &it->d.lustre;
474 /* Similarly, if we're going to replay this request, we don't want to
475 * actually get a lock, just perform the intent. */
476 if (req->rq_transno || req->rq_replay) {
477 lockreq = req_capsule_client_get(pill, &RMF_DLM_REQ);
478 lockreq->lock_flags |= LDLM_FL_INTENT_ONLY;
481 if (rc == ELDLM_LOCK_ABORTED) {
483 memset(lockh, 0, sizeof(*lockh));
485 } else { /* rc = 0 */
486 struct ldlm_lock *lock = ldlm_handle2lock(lockh);
489 /* If the server gave us back a different lock mode, we should
490 * fix up our variables. */
491 if (lock->l_req_mode != einfo->ei_mode) {
492 ldlm_lock_addref(lockh, lock->l_req_mode);
493 ldlm_lock_decref(lockh, einfo->ei_mode);
494 einfo->ei_mode = lock->l_req_mode;
499 lockrep = req_capsule_server_get(pill, &RMF_DLM_REP);
500 LASSERT(lockrep != NULL); /* checked by ldlm_cli_enqueue() */
502 intent->it_disposition = (int)lockrep->lock_policy_res1;
503 intent->it_status = (int)lockrep->lock_policy_res2;
504 intent->it_lock_mode = einfo->ei_mode;
505 intent->it_lock_handle = lockh->cookie;
506 intent->it_data = req;
508 if (intent->it_status < 0 && req->rq_replay)
509 mdc_clear_replay_flag(req, intent->it_status);
511 /* If we're doing an IT_OPEN which did not result in an actual
512 * successful open, then we need to remove the bit which saves
513 * this request for unconditional replay.
515 * It's important that we do this first! Otherwise we might exit the
516 * function without doing so, and try to replay a failed create
518 if (it->it_op & IT_OPEN && req->rq_replay &&
519 (!it_disposition(it, DISP_OPEN_OPEN) ||intent->it_status != 0))
520 mdc_clear_replay_flag(req, intent->it_status);
522 DEBUG_REQ(D_RPCTRACE, req, "op: %d disposition: %x, status: %d",
523 it->it_op, intent->it_disposition, intent->it_status);
525 /* We know what to expect, so we do any byte flipping required here */
526 if (it->it_op & (IT_OPEN | IT_UNLINK | IT_LOOKUP | IT_GETATTR)) {
527 struct mdt_body *body;
529 body = req_capsule_server_get(pill, &RMF_MDT_BODY);
531 CERROR ("Can't swab mdt_body\n");
535 if (it_disposition(it, DISP_OPEN_OPEN) &&
536 !it_open_error(DISP_OPEN_OPEN, it)) {
538 * If this is a successful OPEN request, we need to set
539 * replay handler and data early, so that if replay
540 * happens immediately after swabbing below, new reply
541 * is swabbed by that handler correctly.
543 mdc_set_open_replay_data(NULL, NULL, req);
546 /* TODO: make sure LAYOUT lock must be granted along with EA */
548 if ((body->valid & (OBD_MD_FLDIREA | OBD_MD_FLEASIZE)) != 0) {
551 mdc_update_max_ea_from_body(exp, body);
554 * The eadata is opaque; just check that it is there.
555 * Eventually, obd_unpackmd() will check the contents.
557 eadata = req_capsule_server_sized_get(pill, &RMF_MDT_MD,
563 * We save the reply LOV EA in case we have to replay a
564 * create for recovery. If we didn't allocate a large
565 * enough request buffer above we need to reallocate it
566 * here to hold the actual LOV EA.
568 * To not save LOV EA if request is not going to replay
569 * (for example error one).
571 if ((it->it_op & IT_OPEN) && req->rq_replay) {
573 if (req_capsule_get_size(pill, &RMF_EADATA,
576 mdc_realloc_openmsg(req, body);
578 req_capsule_shrink(pill, &RMF_EADATA,
582 req_capsule_set_size(pill, &RMF_EADATA,
586 lmm = req_capsule_client_get(pill, &RMF_EADATA);
588 memcpy(lmm, eadata, body->eadatasize);
592 if (body->valid & OBD_MD_FLRMTPERM) {
593 struct mdt_remote_perm *perm;
595 LASSERT(client_is_remote(exp));
596 perm = req_capsule_server_swab_get(pill, &RMF_ACL,
597 lustre_swab_mdt_remote_perm);
601 if (body->valid & OBD_MD_FLMDSCAPA) {
602 struct lustre_capa *capa, *p;
604 capa = req_capsule_server_get(pill, &RMF_CAPA1);
608 if (it->it_op & IT_OPEN) {
609 /* client fid capa will be checked in replay */
610 p = req_capsule_client_get(pill, &RMF_CAPA2);
615 if (body->valid & OBD_MD_FLOSSCAPA) {
616 struct lustre_capa *capa;
618 capa = req_capsule_server_get(pill, &RMF_CAPA2);
622 } else if (it->it_op & IT_LAYOUT) {
623 struct ldlm_lock *lock = ldlm_handle2lock(lockh);
625 if (lock != NULL && lock->l_lvb_data == NULL) {
628 /* maybe the lock was granted right away and layout
629 * is packed into RMF_DLM_LVB of req */
630 lvb_len = req_capsule_get_size(pill, &RMF_DLM_LVB,
636 lvb = req_capsule_server_get(pill,
643 OBD_ALLOC_LARGE(lmm, lvb_len);
648 memcpy(lmm, lvb, lvb_len);
650 /* install lvb_data */
651 lock_res_and_lock(lock);
652 LASSERT(lock->l_lvb_data == NULL);
653 lock->l_lvb_data = lmm;
654 lock->l_lvb_len = lvb_len;
655 unlock_res_and_lock(lock);
665 /* We always reserve enough space in the reply packet for a stripe MD, because
666 * we don't know in advance the file type. */
667 int mdc_enqueue(struct obd_export *exp, struct ldlm_enqueue_info *einfo,
668 struct lookup_intent *it, struct md_op_data *op_data,
669 struct lustre_handle *lockh, void *lmm, int lmmsize,
670 struct ptlrpc_request **reqp, int extra_lock_flags)
672 struct obd_device *obddev = class_exp2obd(exp);
673 struct ptlrpc_request *req = NULL;
674 int flags, saved_flags = extra_lock_flags;
676 struct ldlm_res_id res_id;
677 static const ldlm_policy_data_t lookup_policy =
678 { .l_inodebits = { MDS_INODELOCK_LOOKUP } };
679 static const ldlm_policy_data_t update_policy =
680 { .l_inodebits = { MDS_INODELOCK_UPDATE } };
681 static const ldlm_policy_data_t layout_policy =
682 { .l_inodebits = { MDS_INODELOCK_LAYOUT } };
683 ldlm_policy_data_t const *policy = &lookup_policy;
684 int generation, resends = 0;
685 struct ldlm_reply *lockrep;
688 LASSERTF(!it || einfo->ei_type == LDLM_IBITS, "lock type %d\n",
691 fid_build_reg_res_name(&op_data->op_fid1, &res_id);
694 saved_flags |= LDLM_FL_HAS_INTENT;
695 if (it->it_op & (IT_UNLINK | IT_GETATTR | IT_READDIR))
696 policy = &update_policy;
697 else if (it->it_op & IT_LAYOUT)
698 policy = &layout_policy;
701 LASSERT(reqp == NULL);
703 generation = obddev->u.cli.cl_import->imp_generation;
707 /* The only way right now is FLOCK, in this case we hide flock
708 policy as lmm, but lmmsize is 0 */
709 LASSERT(lmm && lmmsize == 0);
710 LASSERTF(einfo->ei_type == LDLM_FLOCK, "lock type %d\n",
712 policy = (ldlm_policy_data_t *)lmm;
713 res_id.name[3] = LDLM_FLOCK;
714 } else if (it->it_op & IT_OPEN) {
715 req = mdc_intent_open_pack(exp, it, op_data, lmm, lmmsize,
717 policy = &update_policy;
718 einfo->ei_cbdata = NULL;
720 } else if (it->it_op & IT_UNLINK)
721 req = mdc_intent_unlink_pack(exp, it, op_data);
722 else if (it->it_op & (IT_GETATTR | IT_LOOKUP))
723 req = mdc_intent_getattr_pack(exp, it, op_data);
724 else if (it->it_op & (IT_READDIR | IT_LAYOUT))
725 req = ldlm_enqueue_pack(exp);
732 RETURN(PTR_ERR(req));
734 if (req != NULL && it && it->it_op & IT_CREAT)
735 /* ask ptlrpc not to resend on EINPROGRESS since we have our own
737 req->rq_no_retry_einprogress = 1;
740 req->rq_generation_set = 1;
741 req->rq_import_generation = generation;
742 req->rq_sent = cfs_time_current_sec() + resends;
745 /* It is important to obtain rpc_lock first (if applicable), so that
746 * threads that are serialised with rpc_lock are not polluting our
747 * rpcs in flight counter. We do not do flock request limiting, though*/
749 mdc_get_rpc_lock(obddev->u.cli.cl_rpc_lock, it);
750 rc = mdc_enter_request(&obddev->u.cli);
752 mdc_put_rpc_lock(obddev->u.cli.cl_rpc_lock, it);
753 mdc_clear_replay_flag(req, 0);
754 ptlrpc_req_finished(req);
759 rc = ldlm_cli_enqueue(exp, &req, einfo, &res_id, policy, &flags, NULL,
762 /* For flock requests we immediatelly return without further
763 delay and let caller deal with the rest, since rest of
764 this function metadata processing makes no sense for flock
769 mdc_exit_request(&obddev->u.cli);
770 mdc_put_rpc_lock(obddev->u.cli.cl_rpc_lock, it);
773 CERROR("ldlm_cli_enqueue: %d\n", rc);
774 mdc_clear_replay_flag(req, rc);
775 ptlrpc_req_finished(req);
779 lockrep = req_capsule_server_get(&req->rq_pill, &RMF_DLM_REP);
780 LASSERT(lockrep != NULL);
782 /* Retry the create infinitely when we get -EINPROGRESS from
783 * server. This is required by the new quota design. */
784 if (it && it->it_op & IT_CREAT &&
785 (int)lockrep->lock_policy_res2 == -EINPROGRESS) {
786 mdc_clear_replay_flag(req, rc);
787 ptlrpc_req_finished(req);
790 CDEBUG(D_HA, "%s: resend:%d op:%d "DFID"/"DFID"\n",
791 obddev->obd_name, resends, it->it_op,
792 PFID(&op_data->op_fid1), PFID(&op_data->op_fid2));
794 if (generation == obddev->u.cli.cl_import->imp_generation) {
797 CDEBUG(D_HA, "resend cross eviction\n");
802 rc = mdc_finish_enqueue(exp, req, einfo, it, lockh, rc);
807 static int mdc_finish_intent_lock(struct obd_export *exp,
808 struct ptlrpc_request *request,
809 struct md_op_data *op_data,
810 struct lookup_intent *it,
811 struct lustre_handle *lockh)
813 struct lustre_handle old_lock;
814 struct mdt_body *mdt_body;
815 struct ldlm_lock *lock;
819 LASSERT(request != NULL);
820 LASSERT(request != LP_POISON);
821 LASSERT(request->rq_repmsg != LP_POISON);
823 if (!it_disposition(it, DISP_IT_EXECD)) {
824 /* The server failed before it even started executing the
825 * intent, i.e. because it couldn't unpack the request. */
826 LASSERT(it->d.lustre.it_status != 0);
827 RETURN(it->d.lustre.it_status);
829 rc = it_open_error(DISP_IT_EXECD, it);
833 mdt_body = req_capsule_server_get(&request->rq_pill, &RMF_MDT_BODY);
834 LASSERT(mdt_body != NULL); /* mdc_enqueue checked */
836 /* If we were revalidating a fid/name pair, mark the intent in
837 * case we fail and get called again from lookup */
838 if (fid_is_sane(&op_data->op_fid2) &&
839 it->it_create_mode & M_CHECK_STALE &&
840 it->it_op != IT_GETATTR) {
841 it_set_disposition(it, DISP_ENQ_COMPLETE);
843 /* Also: did we find the same inode? */
844 /* sever can return one of two fids:
845 * op_fid2 - new allocated fid - if file is created.
846 * op_fid3 - existent fid - if file only open.
847 * op_fid3 is saved in lmv_intent_open */
848 if ((!lu_fid_eq(&op_data->op_fid2, &mdt_body->fid1)) &&
849 (!lu_fid_eq(&op_data->op_fid3, &mdt_body->fid1))) {
850 CDEBUG(D_DENTRY, "Found stale data "DFID"("DFID")/"DFID
851 "\n", PFID(&op_data->op_fid2),
852 PFID(&op_data->op_fid2), PFID(&mdt_body->fid1));
857 rc = it_open_error(DISP_LOOKUP_EXECD, it);
861 /* keep requests around for the multiple phases of the call
862 * this shows the DISP_XX must guarantee we make it into the call
864 if (!it_disposition(it, DISP_ENQ_CREATE_REF) &&
865 it_disposition(it, DISP_OPEN_CREATE) &&
866 !it_open_error(DISP_OPEN_CREATE, it)) {
867 it_set_disposition(it, DISP_ENQ_CREATE_REF);
868 ptlrpc_request_addref(request); /* balanced in ll_create_node */
870 if (!it_disposition(it, DISP_ENQ_OPEN_REF) &&
871 it_disposition(it, DISP_OPEN_OPEN) &&
872 !it_open_error(DISP_OPEN_OPEN, it)) {
873 it_set_disposition(it, DISP_ENQ_OPEN_REF);
874 ptlrpc_request_addref(request); /* balanced in ll_file_open */
875 /* BUG 11546 - eviction in the middle of open rpc processing */
876 OBD_FAIL_TIMEOUT(OBD_FAIL_MDC_ENQUEUE_PAUSE, obd_timeout);
879 if (it->it_op & IT_CREAT) {
880 /* XXX this belongs in ll_create_it */
881 } else if (it->it_op == IT_OPEN) {
882 LASSERT(!it_disposition(it, DISP_OPEN_CREATE));
884 LASSERT(it->it_op & (IT_GETATTR | IT_LOOKUP | IT_LAYOUT));
887 /* If we already have a matching lock, then cancel the new
888 * one. We have to set the data here instead of in
889 * mdc_enqueue, because we need to use the child's inode as
890 * the l_ast_data to match, and that's not available until
891 * intent_finish has performed the iget().) */
892 lock = ldlm_handle2lock(lockh);
894 ldlm_policy_data_t policy = lock->l_policy_data;
895 LDLM_DEBUG(lock, "matching against this");
897 LASSERTF(fid_res_name_eq(&mdt_body->fid1,
898 &lock->l_resource->lr_name),
899 "Lock res_id: %lu/%lu/%lu, fid: %lu/%lu/%lu.\n",
900 (unsigned long)lock->l_resource->lr_name.name[0],
901 (unsigned long)lock->l_resource->lr_name.name[1],
902 (unsigned long)lock->l_resource->lr_name.name[2],
903 (unsigned long)fid_seq(&mdt_body->fid1),
904 (unsigned long)fid_oid(&mdt_body->fid1),
905 (unsigned long)fid_ver(&mdt_body->fid1));
908 memcpy(&old_lock, lockh, sizeof(*lockh));
909 if (ldlm_lock_match(NULL, LDLM_FL_BLOCK_GRANTED, NULL,
910 LDLM_IBITS, &policy, LCK_NL, &old_lock, 0)) {
911 ldlm_lock_decref_and_cancel(lockh,
912 it->d.lustre.it_lock_mode);
913 memcpy(lockh, &old_lock, sizeof(old_lock));
914 it->d.lustre.it_lock_handle = lockh->cookie;
917 CDEBUG(D_DENTRY,"D_IT dentry %.*s intent: %s status %d disp %x rc %d\n",
918 op_data->op_namelen, op_data->op_name, ldlm_it2str(it->it_op),
919 it->d.lustre.it_status, it->d.lustre.it_disposition, rc);
923 int mdc_revalidate_lock(struct obd_export *exp, struct lookup_intent *it,
924 struct lu_fid *fid, __u64 *bits)
926 /* We could just return 1 immediately, but since we should only
927 * be called in revalidate_it if we already have a lock, let's
929 struct ldlm_res_id res_id;
930 struct lustre_handle lockh;
931 ldlm_policy_data_t policy;
935 if (it->d.lustre.it_lock_handle) {
936 lockh.cookie = it->d.lustre.it_lock_handle;
937 mode = ldlm_revalidate_lock_handle(&lockh, bits);
939 fid_build_reg_res_name(fid, &res_id);
942 policy.l_inodebits.bits = MDS_INODELOCK_UPDATE;
945 policy.l_inodebits.bits = MDS_INODELOCK_LAYOUT;
948 policy.l_inodebits.bits = MDS_INODELOCK_LOOKUP;
951 mode = ldlm_lock_match(exp->exp_obd->obd_namespace,
952 LDLM_FL_BLOCK_GRANTED, &res_id,
954 LCK_CR|LCK_CW|LCK_PR|LCK_PW, &lockh, 0);
958 it->d.lustre.it_lock_handle = lockh.cookie;
959 it->d.lustre.it_lock_mode = mode;
961 it->d.lustre.it_lock_handle = 0;
962 it->d.lustre.it_lock_mode = 0;
969 * This long block is all about fixing up the lock and request state
970 * so that it is correct as of the moment _before_ the operation was
971 * applied; that way, the VFS will think that everything is normal and
972 * call Lustre's regular VFS methods.
974 * If we're performing a creation, that means that unless the creation
975 * failed with EEXIST, we should fake up a negative dentry.
977 * For everything else, we want to lookup to succeed.
979 * One additional note: if CREATE or OPEN succeeded, we add an extra
980 * reference to the request because we need to keep it around until
981 * ll_create/ll_open gets called.
983 * The server will return to us, in it_disposition, an indication of
984 * exactly what d.lustre.it_status refers to.
986 * If DISP_OPEN_OPEN is set, then d.lustre.it_status refers to the open() call,
987 * otherwise if DISP_OPEN_CREATE is set, then it status is the
988 * creation failure mode. In either case, one of DISP_LOOKUP_NEG or
989 * DISP_LOOKUP_POS will be set, indicating whether the child lookup
992 * Else, if DISP_LOOKUP_EXECD then d.lustre.it_status is the rc of the
995 int mdc_intent_lock(struct obd_export *exp, struct md_op_data *op_data,
996 void *lmm, int lmmsize, struct lookup_intent *it,
997 int lookup_flags, struct ptlrpc_request **reqp,
998 ldlm_blocking_callback cb_blocking,
999 int extra_lock_flags)
1001 struct lustre_handle lockh;
1006 CDEBUG(D_DLMTRACE, "(name: %.*s,"DFID") in obj "DFID
1007 ", intent: %s flags %#o\n", op_data->op_namelen,
1008 op_data->op_name, PFID(&op_data->op_fid2),
1009 PFID(&op_data->op_fid1), ldlm_it2str(it->it_op),
1013 if (fid_is_sane(&op_data->op_fid2) &&
1014 (it->it_op & (IT_LOOKUP | IT_GETATTR | IT_LAYOUT))) {
1015 /* We could just return 1 immediately, but since we should only
1016 * be called in revalidate_it if we already have a lock, let's
1018 it->d.lustre.it_lock_handle = 0;
1019 rc = mdc_revalidate_lock(exp, it, &op_data->op_fid2, NULL);
1020 /* Only return failure if it was not GETATTR by cfid
1021 (from inode_revalidate) */
1022 if (rc || op_data->op_namelen != 0)
1026 /* lookup_it may be called only after revalidate_it has run, because
1027 * revalidate_it cannot return errors, only zero. Returning zero causes
1028 * this call to lookup, which *can* return an error.
1030 * We only want to execute the request associated with the intent one
1031 * time, however, so don't send the request again. Instead, skip past
1032 * this and use the request from revalidate. In this case, revalidate
1033 * never dropped its reference, so the refcounts are all OK */
1034 if (!it_disposition(it, DISP_ENQ_COMPLETE)) {
1035 struct ldlm_enqueue_info einfo =
1036 { LDLM_IBITS, it_to_lock_mode(it), cb_blocking,
1037 ldlm_completion_ast, NULL, NULL, NULL };
1039 /* For case if upper layer did not alloc fid, do it now. */
1040 if (!fid_is_sane(&op_data->op_fid2) && it->it_op & IT_CREAT) {
1041 rc = mdc_fid_alloc(exp, &op_data->op_fid2, op_data);
1043 CERROR("Can't alloc new fid, rc %d\n", rc);
1047 rc = mdc_enqueue(exp, &einfo, it, op_data, &lockh,
1048 lmm, lmmsize, NULL, extra_lock_flags);
1051 } else if (!fid_is_sane(&op_data->op_fid2) ||
1052 !(it->it_create_mode & M_CHECK_STALE)) {
1053 /* DISP_ENQ_COMPLETE set means there is extra reference on
1054 * request referenced from this intent, saved for subsequent
1055 * lookup. This path is executed when we proceed to this
1056 * lookup, so we clear DISP_ENQ_COMPLETE */
1057 it_clear_disposition(it, DISP_ENQ_COMPLETE);
1059 *reqp = it->d.lustre.it_data;
1060 rc = mdc_finish_intent_lock(exp, *reqp, op_data, it, &lockh);
1064 static int mdc_intent_getattr_async_interpret(const struct lu_env *env,
1065 struct ptlrpc_request *req,
1068 struct mdc_getattr_args *ga = args;
1069 struct obd_export *exp = ga->ga_exp;
1070 struct md_enqueue_info *minfo = ga->ga_minfo;
1071 struct ldlm_enqueue_info *einfo = ga->ga_einfo;
1072 struct lookup_intent *it;
1073 struct lustre_handle *lockh;
1074 struct obd_device *obddev;
1075 int flags = LDLM_FL_HAS_INTENT;
1079 lockh = &minfo->mi_lockh;
1081 obddev = class_exp2obd(exp);
1083 mdc_exit_request(&obddev->u.cli);
1084 if (OBD_FAIL_CHECK(OBD_FAIL_MDC_GETATTR_ENQUEUE))
1087 rc = ldlm_cli_enqueue_fini(exp, req, einfo->ei_type, 1, einfo->ei_mode,
1088 &flags, NULL, 0, lockh, rc);
1090 CERROR("ldlm_cli_enqueue_fini: %d\n", rc);
1091 mdc_clear_replay_flag(req, rc);
1095 rc = mdc_finish_enqueue(exp, req, einfo, it, lockh, rc);
1099 rc = mdc_finish_intent_lock(exp, req, &minfo->mi_data, it, lockh);
1103 OBD_FREE_PTR(einfo);
1104 minfo->mi_cb(req, minfo, rc);
1108 int mdc_intent_getattr_async(struct obd_export *exp,
1109 struct md_enqueue_info *minfo,
1110 struct ldlm_enqueue_info *einfo)
1112 struct md_op_data *op_data = &minfo->mi_data;
1113 struct lookup_intent *it = &minfo->mi_it;
1114 struct ptlrpc_request *req;
1115 struct mdc_getattr_args *ga;
1116 struct obd_device *obddev = class_exp2obd(exp);
1117 struct ldlm_res_id res_id;
1118 /*XXX: Both MDS_INODELOCK_LOOKUP and MDS_INODELOCK_UPDATE are needed
1119 * for statahead currently. Consider CMD in future, such two bits
1120 * maybe managed by different MDS, should be adjusted then. */
1121 ldlm_policy_data_t policy = {
1122 .l_inodebits = { MDS_INODELOCK_LOOKUP |
1123 MDS_INODELOCK_UPDATE }
1126 int flags = LDLM_FL_HAS_INTENT;
1129 CDEBUG(D_DLMTRACE,"name: %.*s in inode "DFID", intent: %s flags %#o\n",
1130 op_data->op_namelen, op_data->op_name, PFID(&op_data->op_fid1),
1131 ldlm_it2str(it->it_op), it->it_flags);
1133 fid_build_reg_res_name(&op_data->op_fid1, &res_id);
1134 req = mdc_intent_getattr_pack(exp, it, op_data);
1138 rc = mdc_enter_request(&obddev->u.cli);
1140 ptlrpc_req_finished(req);
1144 rc = ldlm_cli_enqueue(exp, &req, einfo, &res_id, &policy, &flags, NULL,
1145 0, &minfo->mi_lockh, 1);
1147 mdc_exit_request(&obddev->u.cli);
1148 ptlrpc_req_finished(req);
1152 CLASSERT(sizeof(*ga) <= sizeof(req->rq_async_args));
1153 ga = ptlrpc_req_async_args(req);
1155 ga->ga_minfo = minfo;
1156 ga->ga_einfo = einfo;
1158 req->rq_interpret_reply = mdc_intent_getattr_async_interpret;
1159 ptlrpcd_add_req(req, PDL_POLICY_LOCAL, -1);