1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2 * vim:expandtab:shiftwidth=8:tabstop=8:
4 * Copyright (C) 2001-2003 Cluster File Systems, Inc.
6 * This file is part of the Lustre file system, http://www.lustre.org
7 * Lustre is a trademark of Cluster File Systems, Inc.
9 * You may have signed or agreed to another license before downloading
10 * this software. If so, you are bound by the terms and conditions
11 * of that agreement, and the following does not apply to you. See the
12 * LICENSE file included with this distribution for more information.
14 * If you did not agree to a different license, then this copy of Lustre
15 * is open source software; you can redistribute it and/or modify it
16 * under the terms of version 2 of the GNU General Public License as
17 * published by the Free Software Foundation.
19 * In either case, Lustre is distributed in the hope that it will be
20 * useful, but WITHOUT ANY WARRANTY; without even the implied warranty
21 * of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
22 * license text for more details.
26 # define EXPORT_SYMTAB
28 #define DEBUG_SUBSYSTEM S_MDC
31 # include <linux/module.h>
32 # include <linux/pagemap.h>
33 # include <linux/miscdevice.h>
34 # include <linux/init.h>
36 # include <liblustre.h>
39 #include <linux/lustre_acl.h>
40 #include <obd_class.h>
41 #include <lustre_dlm.h>
42 /* fid_res_name_eq() */
43 #include <lustre_fid.h>
44 #include <lprocfs_status.h>
45 #include "mdc_internal.h"
47 int it_disposition(struct lookup_intent *it, int flag)
49 return it->d.lustre.it_disposition & flag;
51 EXPORT_SYMBOL(it_disposition);
53 void it_set_disposition(struct lookup_intent *it, int flag)
55 it->d.lustre.it_disposition |= flag;
57 EXPORT_SYMBOL(it_set_disposition);
59 void it_clear_disposition(struct lookup_intent *it, int flag)
61 it->d.lustre.it_disposition &= ~flag;
63 EXPORT_SYMBOL(it_clear_disposition);
65 int it_open_error(int phase, struct lookup_intent *it)
67 if (it_disposition(it, DISP_OPEN_OPEN)) {
68 if (phase >= DISP_OPEN_OPEN)
69 return it->d.lustre.it_status;
74 if (it_disposition(it, DISP_OPEN_CREATE)) {
75 if (phase >= DISP_OPEN_CREATE)
76 return it->d.lustre.it_status;
81 if (it_disposition(it, DISP_LOOKUP_EXECD)) {
82 if (phase >= DISP_LOOKUP_EXECD)
83 return it->d.lustre.it_status;
88 if (it_disposition(it, DISP_IT_EXECD)) {
89 if (phase >= DISP_IT_EXECD)
90 return it->d.lustre.it_status;
94 CERROR("it disp: %X, status: %d\n", it->d.lustre.it_disposition,
95 it->d.lustre.it_status);
99 EXPORT_SYMBOL(it_open_error);
101 /* this must be called on a lockh that is known to have a referenced lock */
102 int mdc_set_lock_data(struct obd_export *exp, __u64 *lockh, void *data)
104 struct ldlm_lock *lock;
112 lock = ldlm_handle2lock((struct lustre_handle *)lockh);
114 LASSERT(lock != NULL);
115 lock_res_and_lock(lock);
117 if (lock->l_ast_data && lock->l_ast_data != data) {
118 struct inode *new_inode = data;
119 struct inode *old_inode = lock->l_ast_data;
120 LASSERTF(old_inode->i_state & I_FREEING,
121 "Found existing inode %p/%lu/%u state %lu in lock: "
122 "setting data to %p/%lu/%u\n", old_inode,
123 old_inode->i_ino, old_inode->i_generation,
125 new_inode, new_inode->i_ino, new_inode->i_generation);
128 lock->l_ast_data = data;
129 unlock_res_and_lock(lock);
135 ldlm_mode_t mdc_lock_match(struct obd_export *exp, int flags,
136 const struct lu_fid *fid, ldlm_type_t type,
137 ldlm_policy_data_t *policy, ldlm_mode_t mode,
138 struct lustre_handle *lockh)
140 struct ldlm_res_id res_id;
144 fid_build_reg_res_name(fid, &res_id);
145 rc = ldlm_lock_match(class_exp2obd(exp)->obd_namespace, flags,
146 &res_id, type, policy, mode, lockh);
150 int mdc_cancel_unused(struct obd_export *exp,
151 const struct lu_fid *fid,
152 ldlm_policy_data_t *policy,
153 ldlm_mode_t mode, int flags, void *opaque)
155 struct ldlm_res_id res_id;
156 struct obd_device *obd = class_exp2obd(exp);
161 fid_build_reg_res_name(fid, &res_id);
162 rc = ldlm_cli_cancel_unused_resource(obd->obd_namespace, &res_id,
163 policy, mode, flags, opaque);
167 int mdc_change_cbdata(struct obd_export *exp,
168 const struct lu_fid *fid,
169 ldlm_iterator_t it, void *data)
171 struct ldlm_res_id res_id;
174 fid_build_reg_res_name(fid, &res_id);
175 ldlm_resource_iterate(class_exp2obd(exp)->obd_namespace,
182 static inline void mdc_clear_replay_flag(struct ptlrpc_request *req, int rc)
184 /* Don't hold error requests for replay. */
185 if (req->rq_replay) {
186 spin_lock(&req->rq_lock);
188 spin_unlock(&req->rq_lock);
190 if (rc && req->rq_transno != 0) {
191 DEBUG_REQ(D_ERROR, req, "transno returned on error rc %d", rc);
196 /* Save a large LOV EA into the request buffer so that it is available
197 * for replay. We don't do this in the initial request because the
198 * original request doesn't need this buffer (at most it sends just the
199 * lov_mds_md) and it is a waste of RAM/bandwidth to send the empty
200 * buffer and may also be difficult to allocate and save a very large
201 * request buffer for each open. (bug 5707)
203 * OOM here may cause recovery failure if lmm is needed (only for the
204 * original open if the MDS crashed just when this client also OOM'd)
205 * but this is incredibly unlikely, and questionable whether the client
206 * could do MDS recovery under OOM anyways... */
207 static void mdc_realloc_openmsg(struct ptlrpc_request *req,
208 struct mdt_body *body)
212 /* FIXME: remove this explicit offset. */
213 rc = sptlrpc_cli_enlarge_reqbuf(req, DLM_INTENT_REC_OFF + 4,
216 CERROR("Can't enlarge segment %d size to %d\n",
217 DLM_INTENT_REC_OFF + 4, body->eadatasize);
218 body->valid &= ~OBD_MD_FLEASIZE;
219 body->eadatasize = 0;
223 static struct ptlrpc_request *mdc_intent_open_pack(struct obd_export *exp,
224 struct lookup_intent *it,
225 struct md_op_data *op_data,
226 void *lmm, int lmmsize,
229 struct ptlrpc_request *req;
230 struct obd_device *obddev = class_exp2obd(exp);
231 struct ldlm_intent *lit;
232 int joinfile = !!((it->it_flags & O_JOIN_FILE) &&
234 CFS_LIST_HEAD(cancels);
240 it->it_create_mode = (it->it_create_mode & ~S_IFMT) | S_IFREG;
242 /* XXX: openlock is not cancelled for cross-refs. */
243 /* If inode is known, cancel conflicting OPEN locks. */
244 if (fid_is_sane(&op_data->op_fid2)) {
245 if (it->it_flags & (FMODE_WRITE|MDS_OPEN_TRUNC))
248 else if (it->it_flags & FMODE_EXEC)
253 count = mdc_resource_get_unused(exp, &op_data->op_fid2,
258 /* If CREATE or JOIN_FILE, cancel parent's UPDATE lock. */
259 if (it->it_op & IT_CREAT || joinfile)
263 count += mdc_resource_get_unused(exp, &op_data->op_fid1,
265 MDS_INODELOCK_UPDATE);
267 req = ptlrpc_request_alloc(class_exp2cliimp(exp),
268 &RQF_LDLM_INTENT_OPEN);
270 ldlm_lock_list_put(&cancels, l_bl_ast, count);
271 RETURN(ERR_PTR(-ENOMEM));
274 /* parent capability */
275 mdc_set_capa_size(req, &RMF_CAPA1, op_data->op_capa1);
276 /* child capability, reserve the size according to parent capa, it will
277 * be filled after we get the reply */
278 mdc_set_capa_size(req, &RMF_CAPA2, op_data->op_capa1);
280 req_capsule_set_size(&req->rq_pill, &RMF_NAME, RCL_CLIENT,
281 op_data->op_namelen + 1);
282 req_capsule_set_size(&req->rq_pill, &RMF_EADATA, RCL_CLIENT,
283 max(lmmsize, obddev->u.cli.cl_default_mds_easize));
285 req_capsule_set_size(&req->rq_pill, &RMF_REC_JOINFILE,
289 rc = ldlm_prep_enqueue_req(exp, req, &cancels, count);
291 ptlrpc_request_free(req);
296 __u64 head_size = *(__u64 *)op_data->op_data;
297 mdc_join_pack(req, op_data, head_size);
300 spin_lock(&req->rq_lock);
302 spin_unlock(&req->rq_lock);
304 /* pack the intent */
305 lit = req_capsule_client_get(&req->rq_pill, &RMF_LDLM_INTENT);
306 lit->opc = (__u64)it->it_op;
308 /* pack the intended request */
309 mdc_open_pack(req, op_data, it->it_create_mode, 0, it->it_flags, lmm,
312 /* for remote client, fetch remote perm for current user */
313 if (client_is_remote(exp))
314 req_capsule_set_size(&req->rq_pill, &RMF_ACL, RCL_SERVER,
315 sizeof(struct mdt_remote_perm));
316 ptlrpc_request_set_replen(req);
320 static struct ptlrpc_request *mdc_intent_unlink_pack(struct obd_export *exp,
321 struct lookup_intent *it,
322 struct md_op_data *op_data)
324 struct ptlrpc_request *req;
325 struct obd_device *obddev = class_exp2obd(exp);
326 struct ldlm_intent *lit;
330 req = ptlrpc_request_alloc(class_exp2cliimp(exp),
331 &RQF_LDLM_INTENT_UNLINK);
333 RETURN(ERR_PTR(-ENOMEM));
335 mdc_set_capa_size(req, &RMF_CAPA1, op_data->op_capa1);
336 req_capsule_set_size(&req->rq_pill, &RMF_NAME, RCL_CLIENT,
337 op_data->op_namelen + 1);
339 rc = ldlm_prep_enqueue_req(exp, req, NULL, 0);
341 ptlrpc_request_free(req);
345 /* pack the intent */
346 lit = req_capsule_client_get(&req->rq_pill, &RMF_LDLM_INTENT);
347 lit->opc = (__u64)it->it_op;
349 /* pack the intended request */
350 mdc_unlink_pack(req, op_data);
352 req_capsule_set_size(&req->rq_pill, &RMF_MDT_MD, RCL_SERVER,
353 obddev->u.cli.cl_max_mds_easize);
354 req_capsule_set_size(&req->rq_pill, &RMF_ACL, RCL_SERVER,
355 obddev->u.cli.cl_max_mds_cookiesize);
356 ptlrpc_request_set_replen(req);
360 static struct ptlrpc_request *mdc_intent_getattr_pack(struct obd_export *exp,
361 struct lookup_intent *it,
362 struct md_op_data *op_data)
364 struct ptlrpc_request *req;
365 struct obd_device *obddev = class_exp2obd(exp);
366 obd_valid valid = OBD_MD_FLGETATTR | OBD_MD_FLEASIZE |
367 OBD_MD_FLMODEASIZE | OBD_MD_FLDIREA |
368 OBD_MD_FLMDSCAPA | OBD_MD_MEA |
369 (client_is_remote(exp) ?
370 OBD_MD_FLRMTPERM : OBD_MD_FLACL);
371 struct ldlm_intent *lit;
375 req = ptlrpc_request_alloc(class_exp2cliimp(exp),
376 &RQF_LDLM_INTENT_GETATTR);
378 RETURN(ERR_PTR(-ENOMEM));
380 mdc_set_capa_size(req, &RMF_CAPA1, op_data->op_capa1);
381 req_capsule_set_size(&req->rq_pill, &RMF_NAME, RCL_CLIENT,
382 op_data->op_namelen + 1);
384 rc = ldlm_prep_enqueue_req(exp, req, NULL, 0);
386 ptlrpc_request_free(req);
390 /* pack the intent */
391 lit = req_capsule_client_get(&req->rq_pill, &RMF_LDLM_INTENT);
392 lit->opc = (__u64)it->it_op;
394 /* pack the intended request */
395 mdc_getattr_pack(req, valid, it->it_flags, op_data);
397 req_capsule_set_size(&req->rq_pill, &RMF_MDT_MD, RCL_SERVER,
398 obddev->u.cli.cl_max_mds_easize);
399 if (client_is_remote(exp))
400 req_capsule_set_size(&req->rq_pill, &RMF_ACL, RCL_SERVER,
401 sizeof(struct mdt_remote_perm));
402 ptlrpc_request_set_replen(req);
406 static struct ptlrpc_request *ldlm_enqueue_pack(struct obd_export *exp)
408 struct ptlrpc_request *req;
412 req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_LDLM_ENQUEUE);
414 RETURN(ERR_PTR(-ENOMEM));
416 rc = ldlm_prep_enqueue_req(exp, req, NULL, 0);
418 ptlrpc_request_free(req);
422 ptlrpc_request_set_replen(req);
426 static int mdc_finish_enqueue(struct obd_export *exp,
427 struct ptlrpc_request *req,
428 struct ldlm_enqueue_info *einfo,
429 struct lookup_intent *it,
430 struct lustre_handle *lockh,
433 struct req_capsule *pill = &req->rq_pill;
434 struct ldlm_request *lockreq;
435 struct ldlm_reply *lockrep;
439 /* Similarly, if we're going to replay this request, we don't want to
440 * actually get a lock, just perform the intent. */
441 if (req->rq_transno || req->rq_replay) {
442 lockreq = req_capsule_client_get(pill, &RMF_DLM_REQ);
443 lockreq->lock_flags |= LDLM_FL_INTENT_ONLY;
446 if (rc == ELDLM_LOCK_ABORTED) {
448 memset(lockh, 0, sizeof(*lockh));
450 } else { /* rc = 0 */
451 struct ldlm_lock *lock = ldlm_handle2lock(lockh);
454 /* If the server gave us back a different lock mode, we should
455 * fix up our variables. */
456 if (lock->l_req_mode != einfo->ei_mode) {
457 ldlm_lock_addref(lockh, lock->l_req_mode);
458 ldlm_lock_decref(lockh, einfo->ei_mode);
459 einfo->ei_mode = lock->l_req_mode;
464 lockrep = req_capsule_server_get(pill, &RMF_DLM_REP);
465 LASSERT(lockrep != NULL); /* checked by ldlm_cli_enqueue() */
467 it->d.lustre.it_disposition = (int)lockrep->lock_policy_res1;
468 it->d.lustre.it_status = (int)lockrep->lock_policy_res2;
469 it->d.lustre.it_lock_mode = einfo->ei_mode;
470 it->d.lustre.it_data = req;
472 if (it->d.lustre.it_status < 0 && req->rq_replay)
473 mdc_clear_replay_flag(req, it->d.lustre.it_status);
475 /* If we're doing an IT_OPEN which did not result in an actual
476 * successful open, then we need to remove the bit which saves
477 * this request for unconditional replay.
479 * It's important that we do this first! Otherwise we might exit the
480 * function without doing so, and try to replay a failed create
482 if (it->it_op & IT_OPEN && req->rq_replay &&
483 (!it_disposition(it, DISP_OPEN_OPEN) ||it->d.lustre.it_status != 0))
484 mdc_clear_replay_flag(req, it->d.lustre.it_status);
486 DEBUG_REQ(D_RPCTRACE, req, "op: %d disposition: %x, status: %d",
487 it->it_op,it->d.lustre.it_disposition,it->d.lustre.it_status);
489 /* We know what to expect, so we do any byte flipping required here */
490 if (it->it_op & (IT_OPEN | IT_UNLINK | IT_LOOKUP | IT_GETATTR)) {
491 struct mdt_body *body;
493 body = req_capsule_server_get(pill, &RMF_MDT_BODY);
495 CERROR ("Can't swab mdt_body\n");
499 if (it_disposition(it, DISP_OPEN_OPEN) &&
500 !it_open_error(DISP_OPEN_OPEN, it)) {
502 * If this is a successful OPEN request, we need to set
503 * replay handler and data early, so that if replay
504 * happens immediately after swabbing below, new reply
505 * is swabbed by that handler correctly.
507 mdc_set_open_replay_data(NULL, NULL, req);
510 if ((body->valid & (OBD_MD_FLDIREA | OBD_MD_FLEASIZE)) != 0) {
514 * The eadata is opaque; just check that it is there.
515 * Eventually, obd_unpackmd() will check the contents.
517 eadata = req_capsule_server_sized_get(pill, &RMF_MDT_MD,
522 if (body->valid & OBD_MD_FLMODEASIZE) {
523 struct obd_device *obddev = class_exp2obd(exp);
525 if (obddev->u.cli.cl_max_mds_easize <
527 obddev->u.cli.cl_max_mds_easize =
529 CDEBUG(D_INFO, "maxeasize become %d\n",
532 if (obddev->u.cli.cl_max_mds_cookiesize <
533 body->max_cookiesize) {
534 obddev->u.cli.cl_max_mds_cookiesize =
535 body->max_cookiesize;
536 CDEBUG(D_INFO, "cookiesize become %d\n",
537 body->max_cookiesize);
542 * We save the reply LOV EA in case we have to replay a
543 * create for recovery. If we didn't allocate a large
544 * enough request buffer above we need to reallocate it
545 * here to hold the actual LOV EA.
547 * To not save LOV EA if request is not going to replay
548 * (for example error one).
550 if ((it->it_op & IT_OPEN) && req->rq_replay) {
552 if (req_capsule_get_size(pill, &RMF_EADATA,
555 mdc_realloc_openmsg(req, body);
556 req_capsule_set_size(pill, &RMF_EADATA,
560 lmm = req_capsule_client_get(pill, &RMF_EADATA);
562 memcpy(lmm, eadata, body->eadatasize);
566 if (body->valid & OBD_MD_FLRMTPERM) {
567 struct mdt_remote_perm *perm;
569 LASSERT(client_is_remote(exp));
570 perm = req_capsule_server_swab_get(pill, &RMF_ACL,
571 lustre_swab_mdt_remote_perm);
575 if (body->valid & OBD_MD_FLMDSCAPA) {
576 struct lustre_capa *capa, *p;
578 capa = req_capsule_server_get(pill, &RMF_CAPA1);
582 if (it->it_op & IT_OPEN) {
583 /* client fid capa will be checked in replay */
584 p = req_capsule_client_get(pill, &RMF_CAPA2);
589 if (body->valid & OBD_MD_FLOSSCAPA) {
590 struct lustre_capa *capa;
592 capa = req_capsule_server_get(pill, &RMF_CAPA2);
601 /* We always reserve enough space in the reply packet for a stripe MD, because
602 * we don't know in advance the file type. */
603 int mdc_enqueue(struct obd_export *exp, struct ldlm_enqueue_info *einfo,
604 struct lookup_intent *it, struct md_op_data *op_data,
605 struct lustre_handle *lockh, void *lmm, int lmmsize,
606 int extra_lock_flags)
608 struct obd_device *obddev = class_exp2obd(exp);
609 struct ptlrpc_request *req;
610 struct req_capsule *pill;
611 int flags = extra_lock_flags | LDLM_FL_HAS_INTENT;
613 struct ldlm_res_id res_id;
614 ldlm_policy_data_t policy = { .l_inodebits = { MDS_INODELOCK_LOOKUP } };
617 LASSERTF(einfo->ei_type == LDLM_IBITS, "lock type %d\n", einfo->ei_type);
619 fid_build_reg_res_name(&op_data->op_fid1, &res_id);
621 if (it->it_op & (IT_UNLINK | IT_GETATTR | IT_READDIR))
622 policy.l_inodebits.bits = MDS_INODELOCK_UPDATE;
624 if (it->it_op & IT_OPEN) {
625 int joinfile = !!((it->it_flags & O_JOIN_FILE) &&
628 req = mdc_intent_open_pack(exp, it, op_data, lmm, lmmsize,
631 policy.l_inodebits.bits = MDS_INODELOCK_UPDATE;
632 einfo->ei_cbdata = NULL;
635 it->it_flags &= ~O_JOIN_FILE;
636 } else if (it->it_op & IT_UNLINK)
637 req = mdc_intent_unlink_pack(exp, it, op_data);
638 else if (it->it_op & (IT_GETATTR | IT_LOOKUP))
639 req = mdc_intent_getattr_pack(exp, it, op_data);
640 else if (it->it_op == IT_READDIR)
641 req = ldlm_enqueue_pack(exp);
648 RETURN(PTR_ERR(req));
649 pill = &req->rq_pill;
651 /* It is important to obtain rpc_lock first (if applicable), so that
652 * threads that are serialised with rpc_lock are not polluting our
653 * rpcs in flight counter */
654 mdc_get_rpc_lock(obddev->u.cli.cl_rpc_lock, it);
655 mdc_enter_request(&obddev->u.cli);
656 rc = ldlm_cli_enqueue(exp, &req, einfo, &res_id, &policy, &flags, NULL,
658 mdc_exit_request(&obddev->u.cli);
659 mdc_put_rpc_lock(obddev->u.cli.cl_rpc_lock, it);
661 CERROR("ldlm_cli_enqueue: %d\n", rc);
662 mdc_clear_replay_flag(req, rc);
663 ptlrpc_req_finished(req);
666 rc = mdc_finish_enqueue(exp, req, einfo, it, lockh, rc);
671 static int mdc_finish_intent_lock(struct obd_export *exp,
672 struct ptlrpc_request *request,
673 struct md_op_data *op_data,
674 struct lookup_intent *it,
675 struct lustre_handle *lockh)
677 struct lustre_handle old_lock;
678 struct mdt_body *mdt_body;
679 struct ldlm_lock *lock;
683 LASSERT(request != NULL);
684 LASSERT(request != LP_POISON);
685 LASSERT(request->rq_repmsg != LP_POISON);
687 if (!it_disposition(it, DISP_IT_EXECD)) {
688 /* The server failed before it even started executing the
689 * intent, i.e. because it couldn't unpack the request. */
690 LASSERT(it->d.lustre.it_status != 0);
691 RETURN(it->d.lustre.it_status);
693 rc = it_open_error(DISP_IT_EXECD, it);
697 mdt_body = req_capsule_server_get(&request->rq_pill, &RMF_MDT_BODY);
698 LASSERT(mdt_body != NULL); /* mdc_enqueue checked */
700 /* If we were revalidating a fid/name pair, mark the intent in
701 * case we fail and get called again from lookup */
702 if (fid_is_sane(&op_data->op_fid2) &&
703 it->it_flags & O_CHECK_STALE &&
704 it->it_op != IT_GETATTR) {
705 it_set_disposition(it, DISP_ENQ_COMPLETE);
707 /* Also: did we find the same inode? */
708 /* sever can return one of two fids:
709 * op_fid2 - new allocated fid - if file is created.
710 * op_fid3 - existent fid - if file only open.
711 * op_fid3 is saved in lmv_intent_open */
712 if ((!lu_fid_eq(&op_data->op_fid2, &mdt_body->fid1)) &&
713 (!lu_fid_eq(&op_data->op_fid3, &mdt_body->fid1))) {
714 CDEBUG(D_DENTRY, "Found stale data "DFID"("DFID")/"DFID
715 "\n", PFID(&op_data->op_fid2),
716 PFID(&op_data->op_fid2), PFID(&mdt_body->fid1));
721 rc = it_open_error(DISP_LOOKUP_EXECD, it);
725 /* keep requests around for the multiple phases of the call
726 * this shows the DISP_XX must guarantee we make it into the call
728 if (!it_disposition(it, DISP_ENQ_CREATE_REF) &&
729 it_disposition(it, DISP_OPEN_CREATE) &&
730 !it_open_error(DISP_OPEN_CREATE, it)) {
731 it_set_disposition(it, DISP_ENQ_CREATE_REF);
732 ptlrpc_request_addref(request); /* balanced in ll_create_node */
734 if (!it_disposition(it, DISP_ENQ_OPEN_REF) &&
735 it_disposition(it, DISP_OPEN_OPEN) &&
736 !it_open_error(DISP_OPEN_OPEN, it)) {
737 it_set_disposition(it, DISP_ENQ_OPEN_REF);
738 ptlrpc_request_addref(request); /* balanced in ll_file_open */
739 /* BUG 11546 - eviction in the middle of open rpc processing */
740 OBD_FAIL_TIMEOUT(OBD_FAIL_MDC_ENQUEUE_PAUSE, obd_timeout);
743 if (it->it_op & IT_CREAT) {
744 /* XXX this belongs in ll_create_it */
745 } else if (it->it_op == IT_OPEN) {
746 LASSERT(!it_disposition(it, DISP_OPEN_CREATE));
748 LASSERT(it->it_op & (IT_GETATTR | IT_LOOKUP));
751 /* If we already have a matching lock, then cancel the new
752 * one. We have to set the data here instead of in
753 * mdc_enqueue, because we need to use the child's inode as
754 * the l_ast_data to match, and that's not available until
755 * intent_finish has performed the iget().) */
756 lock = ldlm_handle2lock(lockh);
758 ldlm_policy_data_t policy = lock->l_policy_data;
759 LDLM_DEBUG(lock, "matching against this");
761 LASSERTF(fid_res_name_eq(&mdt_body->fid1,
762 &lock->l_resource->lr_name),
763 "Lock res_id: %lu/%lu/%lu, fid: %lu/%lu/%lu.\n",
764 (unsigned long)lock->l_resource->lr_name.name[0],
765 (unsigned long)lock->l_resource->lr_name.name[1],
766 (unsigned long)lock->l_resource->lr_name.name[2],
767 (unsigned long)fid_seq(&mdt_body->fid1),
768 (unsigned long)fid_oid(&mdt_body->fid1),
769 (unsigned long)fid_ver(&mdt_body->fid1));
772 memcpy(&old_lock, lockh, sizeof(*lockh));
773 if (ldlm_lock_match(NULL, LDLM_FL_BLOCK_GRANTED, NULL,
774 LDLM_IBITS, &policy, LCK_NL, &old_lock)) {
775 ldlm_lock_decref_and_cancel(lockh,
776 it->d.lustre.it_lock_mode);
777 memcpy(lockh, &old_lock, sizeof(old_lock));
778 it->d.lustre.it_lock_handle = lockh->cookie;
781 CDEBUG(D_DENTRY,"D_IT dentry %.*s intent: %s status %d disp %x rc %d\n",
782 op_data->op_namelen, op_data->op_name, ldlm_it2str(it->it_op),
783 it->d.lustre.it_status, it->d.lustre.it_disposition, rc);
788 * This long block is all about fixing up the lock and request state
789 * so that it is correct as of the moment _before_ the operation was
790 * applied; that way, the VFS will think that everything is normal and
791 * call Lustre's regular VFS methods.
793 * If we're performing a creation, that means that unless the creation
794 * failed with EEXIST, we should fake up a negative dentry.
796 * For everything else, we want to lookup to succeed.
798 * One additional note: if CREATE or OPEN succeeded, we add an extra
799 * reference to the request because we need to keep it around until
800 * ll_create/ll_open gets called.
802 * The server will return to us, in it_disposition, an indication of
803 * exactly what d.lustre.it_status refers to.
805 * If DISP_OPEN_OPEN is set, then d.lustre.it_status refers to the open() call,
806 * otherwise if DISP_OPEN_CREATE is set, then it status is the
807 * creation failure mode. In either case, one of DISP_LOOKUP_NEG or
808 * DISP_LOOKUP_POS will be set, indicating whether the child lookup
811 * Else, if DISP_LOOKUP_EXECD then d.lustre.it_status is the rc of the
814 int mdc_intent_lock(struct obd_export *exp, struct md_op_data *op_data,
815 void *lmm, int lmmsize, struct lookup_intent *it,
816 int lookup_flags, struct ptlrpc_request **reqp,
817 ldlm_blocking_callback cb_blocking,
818 int extra_lock_flags)
820 struct lustre_handle lockh;
825 CDEBUG(D_DLMTRACE, "(name: %.*s,"DFID") in obj "DFID
826 ", intent: %s flags %#o\n", op_data->op_namelen,
827 op_data->op_name, PFID(&op_data->op_fid2),
828 PFID(&op_data->op_fid1), ldlm_it2str(it->it_op),
832 if (fid_is_sane(&op_data->op_fid2) &&
833 (it->it_op & (IT_LOOKUP | IT_GETATTR))) {
834 /* We could just return 1 immediately, but since we should only
835 * be called in revalidate_it if we already have a lock, let's
837 ldlm_policy_data_t policy;
840 /* As not all attributes are kept under update lock, e.g.
841 owner/group/acls are under lookup lock, we need both
842 ibits for GETATTR. */
844 /* For CMD, UPDATE lock and LOOKUP lock can not be got
845 * at the same for cross-object, so we can not match
846 * the 2 lock at the same time FIXME: but how to handle
847 * the above situation */
848 policy.l_inodebits.bits = (it->it_op == IT_GETATTR) ?
849 MDS_INODELOCK_UPDATE : MDS_INODELOCK_LOOKUP;
851 mode = mdc_lock_match(exp, LDLM_FL_BLOCK_GRANTED,
852 &op_data->op_fid2, LDLM_IBITS, &policy,
853 LCK_CR|LCK_CW|LCK_PR|LCK_PW, &lockh);
855 it->d.lustre.it_lock_handle = lockh.cookie;
856 it->d.lustre.it_lock_mode = mode;
859 /* Only return failure if it was not GETATTR by cfid
860 (from inode_revalidate) */
861 if (mode || op_data->op_namelen != 0)
865 /* lookup_it may be called only after revalidate_it has run, because
866 * revalidate_it cannot return errors, only zero. Returning zero causes
867 * this call to lookup, which *can* return an error.
869 * We only want to execute the request associated with the intent one
870 * time, however, so don't send the request again. Instead, skip past
871 * this and use the request from revalidate. In this case, revalidate
872 * never dropped its reference, so the refcounts are all OK */
873 if (!it_disposition(it, DISP_ENQ_COMPLETE)) {
874 struct ldlm_enqueue_info einfo =
875 { LDLM_IBITS, it_to_lock_mode(it), cb_blocking,
876 ldlm_completion_ast, NULL, NULL };
878 /* For case if upper layer did not alloc fid, do it now. */
879 if (!fid_is_sane(&op_data->op_fid2) && it->it_op & IT_CREAT) {
880 rc = mdc_fid_alloc(exp, &op_data->op_fid2, op_data);
882 CERROR("Can't alloc new fid, rc %d\n", rc);
886 rc = mdc_enqueue(exp, &einfo, it, op_data, &lockh,
887 lmm, lmmsize, extra_lock_flags);
890 it->d.lustre.it_lock_handle = lockh.cookie;
891 } else if (!fid_is_sane(&op_data->op_fid2) ||
892 !(it->it_flags & O_CHECK_STALE)) {
893 /* DISP_ENQ_COMPLETE set means there is extra reference on
894 * request referenced from this intent, saved for subsequent
895 * lookup. This path is executed when we proceed to this
896 * lookup, so we clear DISP_ENQ_COMPLETE */
897 it_clear_disposition(it, DISP_ENQ_COMPLETE);
899 *reqp = it->d.lustre.it_data;
900 rc = mdc_finish_intent_lock(exp, *reqp, op_data, it, &lockh);
904 static int mdc_intent_getattr_async_interpret(struct ptlrpc_request *req,
905 void *unused, int rc)
907 struct obd_export *exp = req->rq_async_args.pointer_arg[0];
908 struct md_enqueue_info *minfo = req->rq_async_args.pointer_arg[1];
909 struct ldlm_enqueue_info *einfo = req->rq_async_args.pointer_arg[2];
910 struct lookup_intent *it;
911 struct lustre_handle *lockh;
912 struct obd_device *obddev;
913 int flags = LDLM_FL_HAS_INTENT;
917 lockh = &minfo->mi_lockh;
919 obddev = class_exp2obd(exp);
921 mdc_exit_request(&obddev->u.cli);
922 if (OBD_FAIL_CHECK(OBD_FAIL_MDC_GETATTR_ENQUEUE))
925 rc = ldlm_cli_enqueue_fini(exp, req, einfo->ei_type, 1, einfo->ei_mode,
926 &flags, NULL, 0, NULL, lockh, rc);
928 CERROR("ldlm_cli_enqueue_fini: %d\n", rc);
929 mdc_clear_replay_flag(req, rc);
933 rc = mdc_finish_enqueue(exp, req, einfo, it, lockh, rc);
937 it->d.lustre.it_lock_handle = lockh->cookie;
939 rc = mdc_finish_intent_lock(exp, req, &minfo->mi_data, it, lockh);
944 minfo->mi_cb(req, minfo, rc);
948 int mdc_intent_getattr_async(struct obd_export *exp,
949 struct md_enqueue_info *minfo,
950 struct ldlm_enqueue_info *einfo)
952 struct md_op_data *op_data = &minfo->mi_data;
953 struct lookup_intent *it = &minfo->mi_it;
954 struct ptlrpc_request *req;
955 struct obd_device *obddev = class_exp2obd(exp);
956 struct ldlm_res_id res_id;
957 ldlm_policy_data_t policy = {
958 .l_inodebits = { MDS_INODELOCK_LOOKUP }
961 int flags = LDLM_FL_HAS_INTENT;
964 CDEBUG(D_DLMTRACE,"name: %.*s in inode "DFID", intent: %s flags %#o\n",
965 op_data->op_namelen, op_data->op_name, PFID(&op_data->op_fid1),
966 ldlm_it2str(it->it_op), it->it_flags);
968 fid_build_reg_res_name(&op_data->op_fid1, &res_id);
969 req = mdc_intent_getattr_pack(exp, it, op_data);
973 mdc_enter_request(&obddev->u.cli);
974 rc = ldlm_cli_enqueue(exp, &req, einfo, &res_id, &policy, &flags, NULL,
975 0, NULL, &minfo->mi_lockh, 1);
977 mdc_exit_request(&obddev->u.cli);
981 req->rq_async_args.pointer_arg[0] = exp;
982 req->rq_async_args.pointer_arg[1] = minfo;
983 req->rq_async_args.pointer_arg[2] = einfo;
984 req->rq_interpret_reply = mdc_intent_getattr_async_interpret;
985 ptlrpcd_add_req(req);
990 int mdc_revalidate_lock(struct obd_export *exp,
991 struct lookup_intent *it,
994 /* We could just return 1 immediately, but since we should only
995 * be called in revalidate_it if we already have a lock, let's
997 struct ldlm_res_id res_id;
998 struct lustre_handle lockh;
999 ldlm_policy_data_t policy;
1003 fid_build_reg_res_name(fid, &res_id);
1004 /* As not all attributes are kept under update lock, e.g.
1005 owner/group/acls are under lookup lock, we need both
1006 ibits for GETATTR. */
1007 policy.l_inodebits.bits = (it->it_op == IT_GETATTR) ?
1008 MDS_INODELOCK_UPDATE | MDS_INODELOCK_LOOKUP :
1009 MDS_INODELOCK_LOOKUP;
1011 mode = ldlm_lock_match(exp->exp_obd->obd_namespace,
1012 LDLM_FL_BLOCK_GRANTED, &res_id, LDLM_IBITS,
1013 &policy, LCK_CR|LCK_CW|LCK_PR|LCK_PW, &lockh);
1015 it->d.lustre.it_lock_handle = lockh.cookie;
1016 it->d.lustre.it_lock_mode = mode;