1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2 * vim:expandtab:shiftwidth=8:tabstop=8:
6 * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
8 * This program is free software; you can redistribute it and/or modify
9 * it under the terms of the GNU General Public License version 2 only,
10 * as published by the Free Software Foundation.
12 * This program is distributed in the hope that it will be useful, but
13 * WITHOUT ANY WARRANTY; without even the implied warranty of
14 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
15 * General Public License version 2 for more details (a copy is included
16 * in the LICENSE file that accompanied this code).
18 * You should have received a copy of the GNU General Public License
19 * version 2 along with this program; If not, see
20 * http://www.sun.com/software/products/lustre/docs/GPLv2.pdf
22 * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara,
23 * CA 95054 USA or visit www.sun.com if you need additional information or
29 * Copyright 2008 Sun Microsystems, Inc. All rights reserved
30 * Use is subject to license terms.
33 * This file is part of Lustre, http://www.lustre.org/
34 * Lustre is a trademark of Sun Microsystems, Inc.
38 # define EXPORT_SYMTAB
40 #define DEBUG_SUBSYSTEM S_MDC
43 # include <linux/module.h>
44 # include <linux/pagemap.h>
45 # include <linux/miscdevice.h>
46 # include <linux/init.h>
48 # include <liblustre.h>
51 #include <obd_class.h>
52 #include <lustre_dlm.h>
53 #include <lprocfs_status.h>
54 #include "mdc_internal.h"
56 int it_disposition(struct lookup_intent *it, int flag)
58 return it->d.lustre.it_disposition & flag;
60 EXPORT_SYMBOL(it_disposition);
62 void it_set_disposition(struct lookup_intent *it, int flag)
64 it->d.lustre.it_disposition |= flag;
66 EXPORT_SYMBOL(it_set_disposition);
68 void it_clear_disposition(struct lookup_intent *it, int flag)
70 it->d.lustre.it_disposition &= ~flag;
72 EXPORT_SYMBOL(it_clear_disposition);
74 int it_open_error(int phase, struct lookup_intent *it)
76 if (it_disposition(it, DISP_OPEN_OPEN)) {
77 if (phase >= DISP_OPEN_OPEN)
78 return it->d.lustre.it_status;
83 if (it_disposition(it, DISP_OPEN_CREATE)) {
84 if (phase >= DISP_OPEN_CREATE)
85 return it->d.lustre.it_status;
90 if (it_disposition(it, DISP_LOOKUP_EXECD)) {
91 if (phase >= DISP_LOOKUP_EXECD)
92 return it->d.lustre.it_status;
97 if (it_disposition(it, DISP_IT_EXECD)) {
98 if (phase >= DISP_IT_EXECD)
99 return it->d.lustre.it_status;
103 CERROR("it disp: %X, status: %d\n", it->d.lustre.it_disposition,
104 it->d.lustre.it_status);
108 EXPORT_SYMBOL(it_open_error);
110 /* this must be called on a lockh that is known to have a referenced lock */
111 void mdc_set_lock_data(__u64 *l, void *data)
113 struct ldlm_lock *lock;
114 struct lustre_handle *lockh = (struct lustre_handle *)l;
122 lock = ldlm_handle2lock(lockh);
124 LASSERT(lock != NULL);
125 lock_res_and_lock(lock);
127 if (lock->l_ast_data && lock->l_ast_data != data) {
128 struct inode *new_inode = data;
129 struct inode *old_inode = lock->l_ast_data;
130 LASSERTF(old_inode->i_state & I_FREEING,
131 "Found existing inode %p/%lu/%u state %lu in lock: "
132 "setting data to %p/%lu/%u\n", old_inode,
133 old_inode->i_ino, old_inode->i_generation,
135 new_inode, new_inode->i_ino, new_inode->i_generation);
138 lock->l_ast_data = data;
139 unlock_res_and_lock(lock);
144 EXPORT_SYMBOL(mdc_set_lock_data);
146 int mdc_change_cbdata(struct obd_export *exp, struct ll_fid *fid,
147 ldlm_iterator_t it, void *data)
149 struct ldlm_res_id res_id;
152 fid_build_reg_res_name((struct lu_fid*)fid, &res_id);
153 ldlm_resource_iterate(class_exp2obd(exp)->obd_namespace, &res_id,
160 static inline void mdc_clear_replay_flag(struct ptlrpc_request *req, int rc)
162 /* Don't hold error requests for replay. */
163 if (req->rq_replay) {
164 spin_lock(&req->rq_lock);
166 spin_unlock(&req->rq_lock);
168 if (rc && req->rq_transno != 0) {
169 DEBUG_REQ(D_ERROR, req, "transno returned on error rc %d", rc);
174 static int round_up(int val)
184 /* Save a large LOV EA into the request buffer so that it is available
185 * for replay. We don't do this in the initial request because the
186 * original request doesn't need this buffer (at most it sends just the
187 * lov_mds_md) and it is a waste of RAM/bandwidth to send the empty
188 * buffer and may also be difficult to allocate and save a very large
189 * request buffer for each open. (bug 5707)
191 * OOM here may cause recovery failure if lmm is needed (only for the
192 * original open if the MDS crashed just when this client also OOM'd)
193 * but this is incredibly unlikely, and questionable whether the client
194 * could do MDS recovery under OOM anyways... */
195 static void mdc_realloc_openmsg(struct ptlrpc_request *req,
196 struct mds_body *body)
198 int old_len, new_size, old_size;
199 struct lustre_msg *old_msg = req->rq_reqmsg;
200 struct lustre_msg *new_msg;
203 if (mdc_req_is_2_0_server(req))
208 old_len = lustre_msg_buflen(old_msg, DLM_INTENT_REC_OFF + offset);
209 old_size = lustre_packed_msg_size(old_msg);
210 lustre_msg_set_buflen(old_msg, DLM_INTENT_REC_OFF + offset,
212 new_size = lustre_packed_msg_size(old_msg);
214 OBD_ALLOC(new_msg, new_size);
215 if (new_msg != NULL) {
216 DEBUG_REQ(D_INFO, req, "replace reqmsg for larger EA %u",
218 memcpy(new_msg, old_msg, old_size);
220 spin_lock(&req->rq_lock);
221 req->rq_reqmsg = new_msg;
222 req->rq_reqlen = new_size;
223 spin_unlock(&req->rq_lock);
225 OBD_FREE(old_msg, old_size);
227 lustre_msg_set_buflen(old_msg,
228 DLM_INTENT_REC_OFF + offset, old_len);
229 body->valid &= ~OBD_MD_FLEASIZE;
230 body->eadatasize = 0;
234 static struct ptlrpc_request *mdc_intent_open_pack(struct obd_export *exp,
235 struct lookup_intent *it,
236 struct mdc_op_data *data,
237 void *lmm, __u32 lmmsize)
239 struct ptlrpc_request *req;
240 struct ldlm_intent *lit;
241 struct obd_device *obddev = class_exp2obd(exp);
242 __u32 size[9] = { [MSG_PTLRPC_BODY_OFF] = sizeof(struct ptlrpc_body),
243 [DLM_LOCKREQ_OFF] = sizeof(struct ldlm_request),
244 [DLM_INTENT_IT_OFF] = sizeof(*lit),
245 [DLM_INTENT_REC_OFF] = sizeof(struct mds_rec_create),
246 [DLM_INTENT_REC_OFF+1]= data->namelen + 1,
247 /* As an optimization, we allocate an RPC request buffer
248 * for at least a default-sized LOV EA even if we aren't
249 * sending one. We grow the whole request to the next
250 * power-of-two size since we get that much from a slab
251 * allocation anyways. This avoids an allocation below
252 * in the common case where we need to save a
253 * default-sized LOV EA for open replay. */
254 [DLM_INTENT_REC_OFF+2]= max(lmmsize,
255 obddev->u.cli.cl_default_mds_easize) };
256 __u32 repsize[7] = { [MSG_PTLRPC_BODY_OFF] = sizeof(struct ptlrpc_body),
257 [DLM_LOCKREPLY_OFF] = sizeof(struct ldlm_reply),
258 [DLM_REPLY_REC_OFF] = sizeof(struct mdt_body),
259 [DLM_REPLY_REC_OFF+1] = obddev->u.cli.
261 [DLM_REPLY_REC_OFF+2] = LUSTRE_POSIX_ACL_MAX_SIZE };
262 CFS_LIST_HEAD(cancels);
263 int do_join = (it->it_flags & O_JOIN_FILE) && data->data;
271 it->it_create_mode = (it->it_create_mode & ~S_IFMT) | S_IFREG;
272 if (mdc_exp_is_2_0_server(exp)) {
273 size[DLM_INTENT_REC_OFF] = sizeof(struct mdt_rec_create);
274 size[DLM_INTENT_REC_OFF+4] = size[DLM_INTENT_REC_OFF+2];
275 size[DLM_INTENT_REC_OFF+3] = size[DLM_INTENT_REC_OFF+1];
276 size[DLM_INTENT_REC_OFF+2] = 0; /* capa */
277 size[DLM_INTENT_REC_OFF+1] = 0; /* capa */
279 repsize[DLM_REPLY_REC_OFF+3]=sizeof(struct lustre_capa);
280 repsize[DLM_REPLY_REC_OFF+4]=sizeof(struct lustre_capa);
283 rc = lustre_msg_size(class_exp2cliimp(exp)->imp_msg_magic,
286 size[bufcount - 1] = min(size[bufcount - 1] + round_up(rc) - rc,
287 (__u32)obddev->u.cli.cl_max_mds_easize);
289 /* If inode is known, cancel conflicting OPEN locks. */
291 if (it->it_flags & (FMODE_WRITE|MDS_OPEN_TRUNC))
294 else if (it->it_flags & FMODE_EXEC)
299 count = mdc_resource_get_unused(exp, &data->fid2, &cancels,
300 mode, MDS_INODELOCK_OPEN);
303 /* If CREATE or JOIN_FILE, cancel parent's UPDATE lock. */
304 if (it->it_op & IT_CREAT || do_join)
308 count += mdc_resource_get_unused(exp, &data->fid1, &cancels, mode,
309 MDS_INODELOCK_UPDATE);
311 __u64 head_size = (*(__u64 *)data->data);
312 /* join is like an unlink of the tail */
313 if (mdc_exp_is_2_0_server(exp)) {
314 size[DLM_INTENT_REC_OFF+5]=sizeof(struct mdt_rec_join);
316 size[DLM_INTENT_REC_OFF+3]=sizeof(struct mds_rec_join);
320 req = ldlm_prep_enqueue_req(exp, bufcount, size,&cancels,count);
322 mdc_join_pack(req, bufcount - 1, data, head_size);
324 req = ldlm_prep_enqueue_req(exp, bufcount, size,&cancels,count);
325 it->it_flags &= ~O_JOIN_FILE;
329 spin_lock(&req->rq_lock);
331 spin_unlock(&req->rq_lock);
333 /* pack the intent */
334 lit = lustre_msg_buf(req->rq_reqmsg, DLM_INTENT_IT_OFF,
336 lit->opc = (__u64)it->it_op;
338 /* pack the intended request */
339 mdc_open_pack(req, DLM_INTENT_REC_OFF, data,
340 it->it_create_mode, 0, it->it_flags,
343 ptlrpc_req_set_repsize(req, repbufcount, repsize);
348 static struct ptlrpc_request *mdc_intent_unlink_pack(struct obd_export *exp,
349 struct lookup_intent *it,
350 struct mdc_op_data *data)
352 struct ptlrpc_request *req;
353 struct ldlm_intent *lit;
354 struct obd_device *obddev = class_exp2obd(exp);
355 __u32 size[5] = { [MSG_PTLRPC_BODY_OFF] = sizeof(struct ptlrpc_body),
356 [DLM_LOCKREQ_OFF] = sizeof(struct ldlm_request),
357 [DLM_INTENT_IT_OFF] = sizeof(*lit),
358 [DLM_INTENT_REC_OFF] = mdc_exp_is_2_0_server(exp) ?
359 sizeof(struct mdt_rec_unlink) :
360 sizeof(struct mds_rec_unlink),
361 [DLM_INTENT_REC_OFF+1]= data->namelen + 1 };
362 __u32 repsize[5] = { [MSG_PTLRPC_BODY_OFF] = sizeof(struct ptlrpc_body),
363 [DLM_LOCKREPLY_OFF] = sizeof(struct ldlm_reply),
364 [DLM_REPLY_REC_OFF] = sizeof(struct mdt_body),
365 [DLM_REPLY_REC_OFF+1] = obddev->u.cli.
367 [DLM_REPLY_REC_OFF+2] = obddev->u.cli.
368 cl_max_mds_cookiesize };
371 req = ldlm_prep_enqueue_req(exp, 5, size, NULL, 0);
373 /* pack the intent */
374 lit = lustre_msg_buf(req->rq_reqmsg, DLM_INTENT_IT_OFF,
376 lit->opc = (__u64)it->it_op;
378 /* pack the intended request */
379 mdc_unlink_pack(req, DLM_INTENT_REC_OFF, data);
381 ptlrpc_req_set_repsize(req, 5, repsize);
386 static struct ptlrpc_request *mdc_intent_lookup_pack(struct obd_export *exp,
387 struct lookup_intent *it,
388 struct mdc_op_data *data)
390 struct ptlrpc_request *req;
391 struct ldlm_intent *lit;
392 struct obd_device *obddev = class_exp2obd(exp);
393 __u32 size[6] = { [MSG_PTLRPC_BODY_OFF] = sizeof(struct ptlrpc_body),
394 [DLM_LOCKREQ_OFF] = sizeof(struct ldlm_request),
395 [DLM_INTENT_IT_OFF] = sizeof(*lit),
396 [DLM_INTENT_REC_OFF] = sizeof(struct mdt_body),
397 [DLM_INTENT_REC_OFF+1]= data->namelen + 1,
398 [DLM_INTENT_REC_OFF+2]= 0 };
399 __u32 repsize[6] = { [MSG_PTLRPC_BODY_OFF] = sizeof(struct ptlrpc_body),
400 [DLM_LOCKREPLY_OFF] = sizeof(struct ldlm_reply),
401 [DLM_REPLY_REC_OFF] = sizeof(struct mdt_body),
402 [DLM_REPLY_REC_OFF+1] = obddev->u.cli.
404 [DLM_REPLY_REC_OFF+2] = LUSTRE_POSIX_ACL_MAX_SIZE,
405 [DLM_REPLY_REC_OFF+3] = 0 };
406 obd_valid valid = OBD_MD_FLGETATTR | OBD_MD_FLEASIZE | OBD_MD_FLACL |
407 OBD_MD_FLMODEASIZE | OBD_MD_FLDIREA;
411 if (mdc_exp_is_2_0_server(exp)) {
412 size[DLM_INTENT_REC_OFF+1] = 0; /* capa */
413 size[DLM_INTENT_REC_OFF+2] = data->namelen + 1;
416 req = ldlm_prep_enqueue_req(exp, bufcount, size, NULL, 0);
418 /* pack the intent */
419 lit = lustre_msg_buf(req->rq_reqmsg, DLM_INTENT_IT_OFF,
421 lit->opc = (__u64)it->it_op;
423 /* pack the intended request */
424 mdc_getattr_pack(req, DLM_INTENT_REC_OFF, valid, it->it_flags,
426 ptlrpc_req_set_repsize(req, bufcount, repsize);
431 static struct ptlrpc_request *mdc_intent_readdir_pack(struct obd_export *exp)
433 struct ptlrpc_request *req;
434 __u32 size[2] = { [MSG_PTLRPC_BODY_OFF] = sizeof(struct ptlrpc_body),
435 [DLM_LOCKREQ_OFF] = sizeof(struct ldlm_request) };
436 __u32 repsize[3] = { [MSG_PTLRPC_BODY_OFF] = sizeof(struct ptlrpc_body),
437 [DLM_LOCKREPLY_OFF] = sizeof(struct ldlm_reply),
438 [DLM_REPLY_REC_OFF] = sizeof(struct ost_lvb) };
441 req = ldlm_prep_enqueue_req(exp, 2, size, NULL, 0);
443 ptlrpc_req_set_repsize(req, 3, repsize);
447 static int mdc_finish_enqueue(struct obd_export *exp,
448 struct ptlrpc_request *req,
449 struct ldlm_enqueue_info *einfo,
450 struct lookup_intent *it,
451 struct lustre_handle *lockh,
454 struct ldlm_request *lockreq;
455 struct ldlm_reply *lockrep;
459 /* Similarly, if we're going to replay this request, we don't want to
460 * actually get a lock, just perform the intent. */
461 if (req->rq_transno || req->rq_replay) {
462 lockreq = lustre_msg_buf(req->rq_reqmsg, DLM_LOCKREQ_OFF,
464 lockreq->lock_flags |= LDLM_FL_INTENT_ONLY;
467 if (rc == ELDLM_LOCK_ABORTED) {
469 memset(lockh, 0, sizeof(*lockh));
471 } else { /* rc = 0 */
472 struct ldlm_lock *lock = ldlm_handle2lock(lockh);
475 /* If the server gave us back a different lock mode, we should
476 * fix up our variables. */
477 if (lock->l_req_mode != einfo->ei_mode) {
478 ldlm_lock_addref(lockh, lock->l_req_mode);
479 ldlm_lock_decref(lockh, einfo->ei_mode);
480 einfo->ei_mode = lock->l_req_mode;
485 lockrep = lustre_msg_buf(req->rq_repmsg, DLM_LOCKREPLY_OFF,
487 LASSERT(lockrep != NULL); /* checked by ldlm_cli_enqueue() */
488 /* swabbed by ldlm_cli_enqueue() */
489 LASSERT(lustre_rep_swabbed(req, DLM_LOCKREPLY_OFF));
491 it->d.lustre.it_disposition = (int)lockrep->lock_policy_res1;
492 it->d.lustre.it_status = (int)lockrep->lock_policy_res2;
493 it->d.lustre.it_lock_mode = einfo->ei_mode;
494 it->d.lustre.it_lock_handle = lockh->cookie;
495 it->d.lustre.it_data = req;
497 if (it->d.lustre.it_status < 0 && req->rq_replay)
498 mdc_clear_replay_flag(req, it->d.lustre.it_status);
500 /* If we're doing an IT_OPEN which did not result in an actual
501 * successful open, then we need to remove the bit which saves
502 * this request for unconditional replay.
504 * It's important that we do this first! Otherwise we might exit the
505 * function without doing so, and try to replay a failed create
507 if ((it->it_op & IT_OPEN) &&
509 (!it_disposition(it, DISP_OPEN_OPEN) ||
510 it->d.lustre.it_status != 0))
511 mdc_clear_replay_flag(req, it->d.lustre.it_status);
513 DEBUG_REQ(D_RPCTRACE, req, "op: %d disposition: %x, status: %d",
514 it->it_op,it->d.lustre.it_disposition,it->d.lustre.it_status);
516 /* We know what to expect, so we do any byte flipping required here */
517 if (it->it_op & (IT_OPEN | IT_UNLINK | IT_LOOKUP | IT_GETATTR)) {
518 struct mds_body *body;
520 body = lustre_swab_repbuf(req, DLM_REPLY_REC_OFF, sizeof(*body),
521 lustre_swab_mds_body);
523 CERROR ("Can't swab mds_body\n");
527 /* If this is a successful OPEN request, we need to set
528 replay handler and data early, so that if replay happens
529 immediately after swabbing below, new reply is swabbed
530 by that handler correctly */
531 if (it_disposition(it, DISP_OPEN_OPEN) &&
532 !it_open_error(DISP_OPEN_OPEN, it))
533 mdc_set_open_replay_data(NULL, req);
535 if ((body->valid & OBD_MD_FLEASIZE) != 0) {
538 /* The eadata is opaque; just check that it is there.
539 * Eventually, obd_unpackmd() will check the contents */
540 eadata = lustre_swab_repbuf(req, DLM_REPLY_REC_OFF + 1,
541 body->eadatasize, NULL);
542 if (eadata == NULL) {
543 CERROR ("Missing/short eadata\n");
546 /* We save the reply LOV EA in case we have to replay
547 * a create for recovery. If we didn't allocate a
548 * large enough request buffer above we need to
549 * reallocate it here to hold the actual LOV EA. */
550 if (it->it_op & IT_OPEN) {
551 int offset = DLM_INTENT_REC_OFF;
554 if (mdc_req_is_2_0_server(req))
559 if (lustre_msg_buflen(req->rq_reqmsg, offset) <
561 mdc_realloc_openmsg(req, body);
563 lmm = lustre_msg_buf(req->rq_reqmsg, offset,
566 memcpy(lmm, eadata, body->eadatasize);
574 /* We always reserve enough space in the reply packet for a stripe MD, because
575 * we don't know in advance the file type. */
576 int mdc_enqueue(struct obd_export *exp, struct ldlm_enqueue_info *einfo,
577 struct lookup_intent *it, struct mdc_op_data *data,
578 struct lustre_handle *lockh, void *lmm, int lmmsize,
579 int extra_lock_flags)
581 struct ptlrpc_request *req;
582 struct obd_device *obddev = class_exp2obd(exp);
583 struct ldlm_res_id res_id;
584 ldlm_policy_data_t policy = { .l_inodebits = { MDS_INODELOCK_LOOKUP } };
585 int flags = extra_lock_flags | LDLM_FL_HAS_INTENT;
589 fid_build_reg_res_name((void *)&data->fid1, &res_id);
590 LASSERTF(einfo->ei_type == LDLM_IBITS,"lock type %d\n", einfo->ei_type);
591 if (it->it_op & (IT_UNLINK | IT_GETATTR | IT_READDIR))
592 policy.l_inodebits.bits = MDS_INODELOCK_UPDATE;
594 if (it->it_op & IT_OPEN) {
595 if ((it->it_op & IT_CREAT) && mdc_exp_is_2_0_server(exp)) {
596 struct client_obd *cli = &obddev->u.cli;
597 data->fid3 = data->fid2;
598 rc = mdc_fid_alloc(cli->cl_seq, (void *)&data->fid2);
600 CERROR("fid allocation result: %d\n", rc);
604 req = mdc_intent_open_pack(exp, it, data, lmm, lmmsize);
605 if (it->it_flags & O_JOIN_FILE) {
606 policy.l_inodebits.bits = MDS_INODELOCK_UPDATE;
608 } else if (it->it_op & IT_UNLINK) {
609 req = mdc_intent_unlink_pack(exp, it, data);
610 } else if (it->it_op & (IT_GETATTR | IT_LOOKUP)) {
611 req = mdc_intent_lookup_pack(exp, it, data);
612 } else if (it->it_op == IT_READDIR) {
613 req = mdc_intent_readdir_pack(exp);
615 CERROR("bad it_op %x\n", it->it_op);
622 /* It is important to obtain rpc_lock first (if applicable), so that
623 * threads that are serialised with rpc_lock are not polluting our
624 * rpcs in flight counter */
625 mdc_get_rpc_lock(obddev->u.cli.cl_rpc_lock, it);
626 mdc_enter_request(&obddev->u.cli);
627 rc = ldlm_cli_enqueue(exp, &req, einfo, res_id, &policy, &flags, NULL,
629 mdc_exit_request(&obddev->u.cli);
630 mdc_put_rpc_lock(obddev->u.cli.cl_rpc_lock, it);
632 CERROR("ldlm_cli_enqueue: %d\n", rc);
633 mdc_clear_replay_flag(req, rc);
634 ptlrpc_req_finished(req);
637 rc = mdc_finish_enqueue(exp, req, einfo, it, lockh, rc);
641 EXPORT_SYMBOL(mdc_enqueue);
643 int mdc_revalidate_lock(struct obd_export *exp, struct lookup_intent *it,
646 /* We could just return 1 immediately, but since we should only
647 * be called in revalidate_it if we already have a lock, let's
649 struct ldlm_res_id res_id;
650 struct lustre_handle lockh;
651 ldlm_policy_data_t policy;
655 fid_build_reg_res_name((struct lu_fid*)fid, &res_id);
656 /* As not all attributes are kept under update lock, e.g.
657 owner/group/acls are under lookup lock, we need both
658 ibits for GETATTR. */
659 policy.l_inodebits.bits = (it->it_op == IT_GETATTR) ?
660 MDS_INODELOCK_UPDATE | MDS_INODELOCK_LOOKUP :
661 MDS_INODELOCK_LOOKUP;
663 mode = ldlm_lock_match(exp->exp_obd->obd_namespace,
664 LDLM_FL_BLOCK_GRANTED, &res_id, LDLM_IBITS,
665 &policy, LCK_CR|LCK_CW|LCK_PR|LCK_PW, &lockh);
667 memcpy(&it->d.lustre.it_lock_handle, &lockh, sizeof(lockh));
668 it->d.lustre.it_lock_mode = mode;
673 EXPORT_SYMBOL(mdc_revalidate_lock);
675 static int mdc_finish_intent_lock(struct obd_export *exp,
676 struct ptlrpc_request *req,
677 struct mdc_op_data *data,
678 struct lookup_intent *it,
679 struct lustre_handle *lockh)
681 struct mds_body *mds_body;
682 struct lustre_handle old_lock;
683 struct ldlm_lock *lock;
687 LASSERT(req != NULL);
688 LASSERT(req != LP_POISON);
689 LASSERT(req->rq_repmsg != LP_POISON);
691 if (!it_disposition(it, DISP_IT_EXECD)) {
692 /* The server failed before it even started executing the
693 * intent, i.e. because it couldn't unpack the request. */
694 LASSERT(it->d.lustre.it_status != 0);
695 RETURN(it->d.lustre.it_status);
697 rc = it_open_error(DISP_IT_EXECD, it);
701 mds_body = lustre_msg_buf(req->rq_repmsg, DLM_REPLY_REC_OFF,
703 /* mdc_enqueue checked */
704 LASSERT(mds_body != NULL);
705 /* mdc_enqueue swabbed */
706 LASSERT(lustre_rep_swabbed(req, DLM_REPLY_REC_OFF));
708 /* If we were revalidating a fid/name pair, mark the intent in
709 * case we fail and get called again from lookup */
711 if (data->fid2.id && (it->it_op != IT_GETATTR) &&
712 ( !mdc_exp_is_2_0_server(exp) ||
713 (mdc_exp_is_2_0_server(exp) && (it->it_flags & O_CHECK_STALE)))) {
714 it_set_disposition(it, DISP_ENQ_COMPLETE);
716 /* Also: did we find the same inode? */
717 if (memcmp(&data->fid2, &mds_body->fid1, sizeof(data->fid2)) &&
718 memcmp(&data->fid3, &mds_body->fid1, sizeof(data->fid3)))
722 rc = it_open_error(DISP_LOOKUP_EXECD, it);
726 /* keep requests around for the multiple phases of the call
727 * this shows the DISP_XX must guarantee we make it into the call
729 if (!it_disposition(it, DISP_ENQ_CREATE_REF) &&
730 it_disposition(it, DISP_OPEN_CREATE) &&
731 !it_open_error(DISP_OPEN_CREATE, it)) {
732 it_set_disposition(it, DISP_ENQ_CREATE_REF);
733 ptlrpc_request_addref(req); /* balanced in ll_create_node */
735 if (!it_disposition(it, DISP_ENQ_OPEN_REF) &&
736 it_disposition(it, DISP_OPEN_OPEN) &&
737 !it_open_error(DISP_OPEN_OPEN, it)) {
738 it_set_disposition(it, DISP_ENQ_OPEN_REF);
739 ptlrpc_request_addref(req); /* balanced in ll_file_open */
740 /* BUG 11546 - eviction in the middle of open rpc processing */
741 OBD_FAIL_TIMEOUT(OBD_FAIL_MDC_ENQUEUE_PAUSE, obd_timeout);
744 if (it->it_op & IT_CREAT) {
745 /* XXX this belongs in ll_create_it */
746 } else if (it->it_op == IT_OPEN) {
747 LASSERT(!it_disposition(it, DISP_OPEN_CREATE));
749 LASSERT(it->it_op & (IT_GETATTR | IT_LOOKUP));
752 /* If we already have a matching lock, then cancel the new
753 * one. We have to set the data here instead of in
754 * mdc_enqueue, because we need to use the child's inode as
755 * the l_ast_data to match, and that's not available until
756 * intent_finish has performed the iget().) */
757 lock = ldlm_handle2lock(lockh);
759 ldlm_policy_data_t policy = lock->l_policy_data;
761 LDLM_DEBUG(lock, "matching against this");
763 memcpy(&old_lock, lockh, sizeof(*lockh));
764 if (ldlm_lock_match(NULL, LDLM_FL_BLOCK_GRANTED, NULL,
765 LDLM_IBITS, &policy, LCK_NL, &old_lock)) {
766 ldlm_lock_decref_and_cancel(lockh,
767 it->d.lustre.it_lock_mode);
768 memcpy(lockh, &old_lock, sizeof(old_lock));
769 memcpy(&it->d.lustre.it_lock_handle, lockh,
774 CDEBUG(D_DENTRY,"D_IT dentry %.*s intent: %s status %d disp %x rc %d\n",
775 data->namelen, data->name, ldlm_it2str(it->it_op),
776 it->d.lustre.it_status, it->d.lustre.it_disposition, rc);
781 * This long block is all about fixing up the lock and request state
782 * so that it is correct as of the moment _before_ the operation was
783 * applied; that way, the VFS will think that everything is normal and
784 * call Lustre's regular VFS methods.
786 * If we're performing a creation, that means that unless the creation
787 * failed with EEXIST, we should fake up a negative dentry.
789 * For everything else, we want to lookup to succeed.
791 * One additional note: if CREATE or OPEN succeeded, we add an extra
792 * reference to the request because we need to keep it around until
793 * ll_create/ll_open gets called.
795 * The server will return to us, in it_disposition, an indication of
796 * exactly what d.lustre.it_status refers to.
798 * If DISP_OPEN_OPEN is set, then d.lustre.it_status refers to the open() call,
799 * otherwise if DISP_OPEN_CREATE is set, then it status is the
800 * creation failure mode. In either case, one of DISP_LOOKUP_NEG or
801 * DISP_LOOKUP_POS will be set, indicating whether the child lookup
804 * Else, if DISP_LOOKUP_EXECD then d.lustre.it_status is the rc of the
807 int mdc_intent_lock(struct obd_export *exp, struct mdc_op_data *op_data,
808 void *lmm, int lmmsize, struct lookup_intent *it,
809 int lookup_flags, struct ptlrpc_request **reqp,
810 ldlm_blocking_callback cb_blocking, int extra_lock_flags)
812 struct lustre_handle lockh;
818 CDEBUG(D_DLMTRACE,"name: %.*s("DFID") in inode ("DFID"), "
819 "intent: %s flags %#o\n",
820 op_data->namelen, op_data->name,
821 PFID(((void *)&op_data->fid2)),
822 PFID(((void *)&op_data->fid1)),
823 ldlm_it2str(it->it_op), it->it_flags);
826 if (op_data->fid2.id &&
827 (it->it_op == IT_LOOKUP || it->it_op == IT_GETATTR)) {
828 rc = mdc_revalidate_lock(exp, it, &op_data->fid2);
829 /* Only return failure if it was not GETATTR by cfid
830 (from inode_revalidate) */
831 if (rc || op_data->namelen != 0)
835 /* lookup_it may be called only after revalidate_it has run, because
836 * revalidate_it cannot return errors, only zero. Returning zero causes
837 * this call to lookup, which *can* return an error.
839 * We only want to execute the request associated with the intent one
840 * time, however, so don't send the request again. Instead, skip past
841 * this and use the request from revalidate. In this case, revalidate
842 * never dropped its reference, so the refcounts are all OK */
843 if (!it_disposition(it, DISP_ENQ_COMPLETE)) {
844 struct ldlm_enqueue_info einfo =
845 { LDLM_IBITS, it_to_lock_mode(it), cb_blocking,
846 ldlm_completion_ast, NULL, NULL };
848 rc = mdc_enqueue(exp, &einfo, it, op_data, &lockh,
849 lmm, lmmsize, extra_lock_flags);
852 } else if (!op_data->fid2.id) {
853 /* DISP_ENQ_COMPLETE set means there is extra reference on
854 * request referenced from this intent, saved for subsequent
855 * lookup. This path is executed when we proceed to this
856 * lookup, so we clear DISP_ENQ_COMPLETE */
857 it_clear_disposition(it, DISP_ENQ_COMPLETE);
860 *reqp = it->d.lustre.it_data;
861 rc = mdc_finish_intent_lock(exp, *reqp, op_data, it, &lockh);
865 EXPORT_SYMBOL(mdc_intent_lock);
867 static int mdc_intent_getattr_async_interpret(struct ptlrpc_request *req,
868 void *unused, int rc)
870 struct obd_export *exp = req->rq_async_args.pointer_arg[0];
871 struct md_enqueue_info *minfo = req->rq_async_args.pointer_arg[1];
872 struct ldlm_enqueue_info *einfo = req->rq_async_args.pointer_arg[2];
873 struct lookup_intent *it;
874 struct lustre_handle *lockh;
875 struct obd_device *obddev;
876 int flags = LDLM_FL_HAS_INTENT;
880 lockh = &minfo->mi_lockh;
882 obddev = class_exp2obd(exp);
884 mdc_exit_request(&obddev->u.cli);
885 if (OBD_FAIL_CHECK(OBD_FAIL_MDC_GETATTR_ENQUEUE))
888 rc = ldlm_cli_enqueue_fini(exp, req, einfo->ei_type, 1, einfo->ei_mode,
889 &flags, NULL, 0, NULL, lockh, rc);
891 CERROR("ldlm_cli_enqueue_fini: %d\n", rc);
892 mdc_clear_replay_flag(req, rc);
896 rc = mdc_finish_enqueue(exp, req, einfo, it, lockh, rc);
900 rc = mdc_finish_intent_lock(exp, req, &minfo->mi_data, it, lockh);
904 minfo->mi_cb(exp, req, minfo, rc);
909 int mdc_intent_getattr_async(struct obd_export *exp,
910 struct md_enqueue_info *minfo,
911 struct ldlm_enqueue_info *einfo)
913 struct mdc_op_data *op_data = &minfo->mi_data;
914 struct lookup_intent *it = &minfo->mi_it;
915 struct ptlrpc_request *req;
916 struct obd_device *obddev = class_exp2obd(exp);
917 struct ldlm_res_id res_id;
918 ldlm_policy_data_t policy = {
919 .l_inodebits = { MDS_INODELOCK_LOOKUP }
922 int flags = LDLM_FL_HAS_INTENT;
925 CDEBUG(D_DLMTRACE,"name: %.*s in inode "LPU64", intent: %s flags %#o\n",
926 op_data->namelen, op_data->name, op_data->fid1.id,
927 ldlm_it2str(it->it_op), it->it_flags);
929 fid_build_reg_res_name((void *)&op_data->fid1, &res_id);
930 req = mdc_intent_lookup_pack(exp, it, op_data);
934 mdc_enter_request(&obddev->u.cli);
935 rc = ldlm_cli_enqueue(exp, &req, einfo, res_id, &policy, &flags, NULL,
936 0, NULL, &minfo->mi_lockh, 1);
938 mdc_exit_request(&obddev->u.cli);
942 req->rq_async_args.pointer_arg[0] = exp;
943 req->rq_async_args.pointer_arg[1] = minfo;
944 req->rq_async_args.pointer_arg[2] = einfo;
945 req->rq_interpret_reply = mdc_intent_getattr_async_interpret;
946 ptlrpcd_add_req(req);
950 EXPORT_SYMBOL(mdc_intent_getattr_async);