1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2 * vim:expandtab:shiftwidth=8:tabstop=8:
6 * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
8 * This program is free software; you can redistribute it and/or modify
9 * it under the terms of the GNU General Public License version 2 only,
10 * as published by the Free Software Foundation.
12 * This program is distributed in the hope that it will be useful, but
13 * WITHOUT ANY WARRANTY; without even the implied warranty of
14 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
15 * General Public License version 2 for more details (a copy is included
16 * in the LICENSE file that accompanied this code).
18 * You should have received a copy of the GNU General Public License
19 * version 2 along with this program; If not, see
20 * http://www.sun.com/software/products/lustre/docs/GPLv2.pdf
22 * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara,
23 * CA 95054 USA or visit www.sun.com if you need additional information or
29 * Copyright 2008 Sun Microsystems, Inc. All rights reserved
30 * Use is subject to license terms.
33 * This file is part of Lustre, http://www.lustre.org/
34 * Lustre is a trademark of Sun Microsystems, Inc.
38 # define EXPORT_SYMTAB
40 #define DEBUG_SUBSYSTEM S_MDC
43 # include <linux/module.h>
44 # include <linux/pagemap.h>
45 # include <linux/miscdevice.h>
46 # include <linux/init.h>
48 # include <liblustre.h>
51 #include <obd_class.h>
52 #include <lustre_dlm.h>
53 #include <lprocfs_status.h>
54 #include "mdc_internal.h"
56 int it_disposition(struct lookup_intent *it, int flag)
58 return it->d.lustre.it_disposition & flag;
60 EXPORT_SYMBOL(it_disposition);
62 void it_set_disposition(struct lookup_intent *it, int flag)
64 it->d.lustre.it_disposition |= flag;
66 EXPORT_SYMBOL(it_set_disposition);
68 void it_clear_disposition(struct lookup_intent *it, int flag)
70 it->d.lustre.it_disposition &= ~flag;
72 EXPORT_SYMBOL(it_clear_disposition);
74 int it_open_error(int phase, struct lookup_intent *it)
76 if (it_disposition(it, DISP_OPEN_OPEN)) {
77 if (phase >= DISP_OPEN_OPEN)
78 return it->d.lustre.it_status;
83 if (it_disposition(it, DISP_OPEN_CREATE)) {
84 if (phase >= DISP_OPEN_CREATE)
85 return it->d.lustre.it_status;
90 if (it_disposition(it, DISP_LOOKUP_EXECD)) {
91 if (phase >= DISP_LOOKUP_EXECD)
92 return it->d.lustre.it_status;
97 if (it_disposition(it, DISP_IT_EXECD)) {
98 if (phase >= DISP_IT_EXECD)
99 return it->d.lustre.it_status;
103 CERROR("it disp: %X, status: %d\n", it->d.lustre.it_disposition,
104 it->d.lustre.it_status);
108 EXPORT_SYMBOL(it_open_error);
110 /* this must be called on a lockh that is known to have a referenced lock */
111 void mdc_set_lock_data(__u64 *l, void *data)
113 struct ldlm_lock *lock;
114 struct lustre_handle *lockh = (struct lustre_handle *)l;
122 lock = ldlm_handle2lock(lockh);
124 LASSERT(lock != NULL);
125 lock_res_and_lock(lock);
127 if (lock->l_ast_data && lock->l_ast_data != data) {
128 struct inode *new_inode = data;
129 struct inode *old_inode = lock->l_ast_data;
130 LASSERTF(old_inode->i_state & I_FREEING,
131 "Found existing inode %p/%lu/%u state %lu in lock: "
132 "setting data to %p/%lu/%u\n", old_inode,
133 old_inode->i_ino, old_inode->i_generation,
135 new_inode, new_inode->i_ino, new_inode->i_generation);
138 lock->l_ast_data = data;
139 unlock_res_and_lock(lock);
144 EXPORT_SYMBOL(mdc_set_lock_data);
146 int mdc_change_cbdata(struct obd_export *exp, struct ll_fid *fid,
147 ldlm_iterator_t it, void *data)
149 struct ldlm_res_id res_id;
152 fid_build_reg_res_name((struct lu_fid*)fid, &res_id);
153 ldlm_resource_iterate(class_exp2obd(exp)->obd_namespace, &res_id,
160 static inline void mdc_clear_replay_flag(struct ptlrpc_request *req, int rc)
162 /* Don't hold error requests for replay. */
163 if (req->rq_replay) {
164 spin_lock(&req->rq_lock);
166 spin_unlock(&req->rq_lock);
168 if (rc && req->rq_transno != 0) {
169 DEBUG_REQ(D_ERROR, req, "transno returned on error rc %d", rc);
174 static int round_up(int val)
184 /* Save a large LOV EA into the request buffer so that it is available
185 * for replay. We don't do this in the initial request because the
186 * original request doesn't need this buffer (at most it sends just the
187 * lov_mds_md) and it is a waste of RAM/bandwidth to send the empty
188 * buffer and may also be difficult to allocate and save a very large
189 * request buffer for each open. (bug 5707)
191 * OOM here may cause recovery failure if lmm is needed (only for the
192 * original open if the MDS crashed just when this client also OOM'd)
193 * but this is incredibly unlikely, and questionable whether the client
194 * could do MDS recovery under OOM anyways... */
195 static void mdc_realloc_openmsg(struct ptlrpc_request *req,
196 struct mds_body *body)
198 int old_len, new_size, old_size;
199 struct lustre_msg *old_msg = req->rq_reqmsg;
200 struct lustre_msg *new_msg;
202 old_len = lustre_msg_buflen(old_msg, DLM_INTENT_REC_OFF + 2);
203 old_size = lustre_packed_msg_size(old_msg);
204 lustre_msg_set_buflen(old_msg, DLM_INTENT_REC_OFF + 2,
206 new_size = lustre_packed_msg_size(old_msg);
208 OBD_ALLOC(new_msg, new_size);
209 if (new_msg != NULL) {
210 DEBUG_REQ(D_INFO, req, "replace reqmsg for larger EA %u",
212 memcpy(new_msg, old_msg, old_size);
214 spin_lock(&req->rq_lock);
215 req->rq_reqmsg = new_msg;
216 req->rq_reqlen = new_size;
217 spin_unlock(&req->rq_lock);
219 OBD_FREE(old_msg, old_size);
221 lustre_msg_set_buflen(old_msg, DLM_INTENT_REC_OFF + 2, old_len);
222 body->valid &= ~OBD_MD_FLEASIZE;
223 body->eadatasize = 0;
227 static struct ptlrpc_request *mdc_intent_open_pack(struct obd_export *exp,
228 struct lookup_intent *it,
229 struct mdc_op_data *data,
230 void *lmm, __u32 lmmsize)
232 struct ptlrpc_request *req;
233 struct ldlm_intent *lit;
234 struct obd_device *obddev = class_exp2obd(exp);
235 __u32 size[9] = { [MSG_PTLRPC_BODY_OFF] = sizeof(struct ptlrpc_body),
236 [DLM_LOCKREQ_OFF] = sizeof(struct ldlm_request),
237 [DLM_INTENT_IT_OFF] = sizeof(*lit),
238 [DLM_INTENT_REC_OFF] = sizeof(struct mds_rec_create),
239 [DLM_INTENT_REC_OFF+1]= data->namelen + 1,
240 /* As an optimization, we allocate an RPC request buffer
241 * for at least a default-sized LOV EA even if we aren't
242 * sending one. We grow the whole request to the next
243 * power-of-two size since we get that much from a slab
244 * allocation anyways. This avoids an allocation below
245 * in the common case where we need to save a
246 * default-sized LOV EA for open replay. */
247 [DLM_INTENT_REC_OFF+2]= max(lmmsize,
248 obddev->u.cli.cl_default_mds_easize) };
249 __u32 repsize[7] = { [MSG_PTLRPC_BODY_OFF] = sizeof(struct ptlrpc_body),
250 [DLM_LOCKREPLY_OFF] = sizeof(struct ldlm_reply),
251 [DLM_REPLY_REC_OFF] = sizeof(struct mdt_body),
252 [DLM_REPLY_REC_OFF+1] = obddev->u.cli.
254 [DLM_REPLY_REC_OFF+2] = LUSTRE_POSIX_ACL_MAX_SIZE };
255 CFS_LIST_HEAD(cancels);
256 int do_join = (it->it_flags & O_JOIN_FILE) && data->data;
264 it->it_create_mode = (it->it_create_mode & ~S_IFMT) | S_IFREG;
265 if (mdc_exp_is_2_0_server(exp)) {
266 size[DLM_INTENT_REC_OFF] = sizeof(struct mdt_rec_create);
267 size[DLM_INTENT_REC_OFF+4] = size[DLM_INTENT_REC_OFF+2];
268 size[DLM_INTENT_REC_OFF+3] = size[DLM_INTENT_REC_OFF+1];
269 size[DLM_INTENT_REC_OFF+2] = 0; /* capa */
270 size[DLM_INTENT_REC_OFF+1] = 0; /* capa */
272 repsize[DLM_REPLY_REC_OFF+3]=sizeof(struct lustre_capa);
273 repsize[DLM_REPLY_REC_OFF+4]=sizeof(struct lustre_capa);
276 rc = lustre_msg_size(class_exp2cliimp(exp)->imp_msg_magic,
279 size[bufcount - 1] = min(size[bufcount - 1] + round_up(rc) - rc,
280 obddev->u.cli.cl_max_mds_easize);
282 /* If inode is known, cancel conflicting OPEN locks. */
284 if (it->it_flags & (FMODE_WRITE|MDS_OPEN_TRUNC))
287 else if (it->it_flags & FMODE_EXEC)
292 count = mdc_resource_get_unused(exp, &data->fid2, &cancels,
293 mode, MDS_INODELOCK_OPEN);
296 /* If CREATE or JOIN_FILE, cancel parent's UPDATE lock. */
297 if (it->it_op & IT_CREAT || do_join)
301 count += mdc_resource_get_unused(exp, &data->fid1, &cancels, mode,
302 MDS_INODELOCK_UPDATE);
304 __u64 head_size = (*(__u64 *)data->data);
305 /* join is like an unlink of the tail */
306 if (mdc_exp_is_2_0_server(exp)) {
307 size[DLM_INTENT_REC_OFF+5]=sizeof(struct mdt_rec_join);
309 size[DLM_INTENT_REC_OFF+3]=sizeof(struct mds_rec_join);
313 req = ldlm_prep_enqueue_req(exp, bufcount, size,&cancels,count);
315 mdc_join_pack(req, bufcount - 1, data, head_size);
317 req = ldlm_prep_enqueue_req(exp, bufcount, size,&cancels,count);
318 it->it_flags &= ~O_JOIN_FILE;
322 spin_lock(&req->rq_lock);
324 spin_unlock(&req->rq_lock);
326 /* pack the intent */
327 lit = lustre_msg_buf(req->rq_reqmsg, DLM_INTENT_IT_OFF,
329 lit->opc = (__u64)it->it_op;
331 /* pack the intended request */
332 mdc_open_pack(req, DLM_INTENT_REC_OFF, data,
333 it->it_create_mode, 0, it->it_flags,
336 ptlrpc_req_set_repsize(req, repbufcount, repsize);
341 static struct ptlrpc_request *mdc_intent_unlink_pack(struct obd_export *exp,
342 struct lookup_intent *it,
343 struct mdc_op_data *data)
345 struct ptlrpc_request *req;
346 struct ldlm_intent *lit;
347 struct obd_device *obddev = class_exp2obd(exp);
348 __u32 size[5] = { [MSG_PTLRPC_BODY_OFF] = sizeof(struct ptlrpc_body),
349 [DLM_LOCKREQ_OFF] = sizeof(struct ldlm_request),
350 [DLM_INTENT_IT_OFF] = sizeof(*lit),
351 [DLM_INTENT_REC_OFF] = mdc_exp_is_2_0_server(exp) ?
352 sizeof(struct mdt_rec_unlink) :
353 sizeof(struct mds_rec_unlink),
354 [DLM_INTENT_REC_OFF+1]= data->namelen + 1 };
355 __u32 repsize[5] = { [MSG_PTLRPC_BODY_OFF] = sizeof(struct ptlrpc_body),
356 [DLM_LOCKREPLY_OFF] = sizeof(struct ldlm_reply),
357 [DLM_REPLY_REC_OFF] = sizeof(struct mdt_body),
358 [DLM_REPLY_REC_OFF+1] = obddev->u.cli.
360 [DLM_REPLY_REC_OFF+2] = obddev->u.cli.
361 cl_max_mds_cookiesize };
364 req = ldlm_prep_enqueue_req(exp, 5, size, NULL, 0);
366 /* pack the intent */
367 lit = lustre_msg_buf(req->rq_reqmsg, DLM_INTENT_IT_OFF,
369 lit->opc = (__u64)it->it_op;
371 /* pack the intended request */
372 mdc_unlink_pack(req, DLM_INTENT_REC_OFF, data);
374 ptlrpc_req_set_repsize(req, 5, repsize);
379 static struct ptlrpc_request *mdc_intent_lookup_pack(struct obd_export *exp,
380 struct lookup_intent *it,
381 struct mdc_op_data *data)
383 struct ptlrpc_request *req;
384 struct ldlm_intent *lit;
385 struct obd_device *obddev = class_exp2obd(exp);
386 __u32 size[6] = { [MSG_PTLRPC_BODY_OFF] = sizeof(struct ptlrpc_body),
387 [DLM_LOCKREQ_OFF] = sizeof(struct ldlm_request),
388 [DLM_INTENT_IT_OFF] = sizeof(*lit),
389 [DLM_INTENT_REC_OFF] = sizeof(struct mdt_body),
390 [DLM_INTENT_REC_OFF+1]= data->namelen + 1,
391 [DLM_INTENT_REC_OFF+2]= 0 };
392 __u32 repsize[6] = { [MSG_PTLRPC_BODY_OFF] = sizeof(struct ptlrpc_body),
393 [DLM_LOCKREPLY_OFF] = sizeof(struct ldlm_reply),
394 [DLM_REPLY_REC_OFF] = sizeof(struct mdt_body),
395 [DLM_REPLY_REC_OFF+1] = obddev->u.cli.
397 [DLM_REPLY_REC_OFF+2] = LUSTRE_POSIX_ACL_MAX_SIZE,
398 [DLM_REPLY_REC_OFF+3] = 0 };
399 obd_valid valid = OBD_MD_FLGETATTR | OBD_MD_FLEASIZE | OBD_MD_FLACL |
400 OBD_MD_FLMODEASIZE | OBD_MD_FLDIREA;
404 if (mdc_exp_is_2_0_server(exp)) {
405 size[DLM_INTENT_REC_OFF+1] = 0; /* capa */
406 size[DLM_INTENT_REC_OFF+2] = data->namelen + 1;
409 req = ldlm_prep_enqueue_req(exp, bufcount, size, NULL, 0);
411 /* pack the intent */
412 lit = lustre_msg_buf(req->rq_reqmsg, DLM_INTENT_IT_OFF,
414 lit->opc = (__u64)it->it_op;
416 /* pack the intended request */
417 mdc_getattr_pack(req, DLM_INTENT_REC_OFF, valid, it->it_flags,
419 ptlrpc_req_set_repsize(req, bufcount, repsize);
424 static struct ptlrpc_request *mdc_intent_readdir_pack(struct obd_export *exp)
426 struct ptlrpc_request *req;
427 __u32 size[2] = { [MSG_PTLRPC_BODY_OFF] = sizeof(struct ptlrpc_body),
428 [DLM_LOCKREQ_OFF] = sizeof(struct ldlm_request) };
429 __u32 repsize[3] = { [MSG_PTLRPC_BODY_OFF] = sizeof(struct ptlrpc_body),
430 [DLM_LOCKREPLY_OFF] = sizeof(struct ldlm_reply),
431 [DLM_REPLY_REC_OFF] = sizeof(struct ost_lvb) };
434 req = ldlm_prep_enqueue_req(exp, 2, size, NULL, 0);
436 ptlrpc_req_set_repsize(req, 3, repsize);
440 static int mdc_finish_enqueue(struct obd_export *exp,
441 struct ptlrpc_request *req,
442 struct ldlm_enqueue_info *einfo,
443 struct lookup_intent *it,
444 struct lustre_handle *lockh,
447 struct ldlm_request *lockreq;
448 struct ldlm_reply *lockrep;
452 /* Similarly, if we're going to replay this request, we don't want to
453 * actually get a lock, just perform the intent. */
454 if (req->rq_transno || req->rq_replay) {
455 lockreq = lustre_msg_buf(req->rq_reqmsg, DLM_LOCKREQ_OFF,
457 lockreq->lock_flags |= LDLM_FL_INTENT_ONLY;
460 if (rc == ELDLM_LOCK_ABORTED) {
462 memset(lockh, 0, sizeof(*lockh));
464 } else { /* rc = 0 */
465 struct ldlm_lock *lock = ldlm_handle2lock(lockh);
468 /* If the server gave us back a different lock mode, we should
469 * fix up our variables. */
470 if (lock->l_req_mode != einfo->ei_mode) {
471 ldlm_lock_addref(lockh, lock->l_req_mode);
472 ldlm_lock_decref(lockh, einfo->ei_mode);
473 einfo->ei_mode = lock->l_req_mode;
478 lockrep = lustre_msg_buf(req->rq_repmsg, DLM_LOCKREPLY_OFF,
480 LASSERT(lockrep != NULL); /* checked by ldlm_cli_enqueue() */
481 /* swabbed by ldlm_cli_enqueue() */
482 LASSERT(lustre_rep_swabbed(req, DLM_LOCKREPLY_OFF));
484 it->d.lustre.it_disposition = (int)lockrep->lock_policy_res1;
485 it->d.lustre.it_status = (int)lockrep->lock_policy_res2;
486 it->d.lustre.it_lock_mode = einfo->ei_mode;
487 it->d.lustre.it_data = req;
489 if (it->d.lustre.it_status < 0 && req->rq_replay)
490 mdc_clear_replay_flag(req, it->d.lustre.it_status);
492 /* If we're doing an IT_OPEN which did not result in an actual
493 * successful open, then we need to remove the bit which saves
494 * this request for unconditional replay.
496 * It's important that we do this first! Otherwise we might exit the
497 * function without doing so, and try to replay a failed create
499 if ((it->it_op & IT_OPEN) &&
501 (!it_disposition(it, DISP_OPEN_OPEN) ||
502 it->d.lustre.it_status != 0))
503 mdc_clear_replay_flag(req, it->d.lustre.it_status);
505 DEBUG_REQ(D_RPCTRACE, req, "op: %d disposition: %x, status: %d",
506 it->it_op,it->d.lustre.it_disposition,it->d.lustre.it_status);
508 /* We know what to expect, so we do any byte flipping required here */
509 if (it->it_op & (IT_OPEN | IT_UNLINK | IT_LOOKUP | IT_GETATTR)) {
510 struct mds_body *body;
512 body = lustre_swab_repbuf(req, DLM_REPLY_REC_OFF, sizeof(*body),
513 lustre_swab_mds_body);
515 CERROR ("Can't swab mds_body\n");
519 /* If this is a successful OPEN request, we need to set
520 replay handler and data early, so that if replay happens
521 immediately after swabbing below, new reply is swabbed
522 by that handler correctly */
523 if (it_disposition(it, DISP_OPEN_OPEN) &&
524 !it_open_error(DISP_OPEN_OPEN, it))
525 mdc_set_open_replay_data(NULL, req);
527 if ((body->valid & OBD_MD_FLEASIZE) != 0) {
530 /* The eadata is opaque; just check that it is there.
531 * Eventually, obd_unpackmd() will check the contents */
532 eadata = lustre_swab_repbuf(req, DLM_REPLY_REC_OFF + 1,
533 body->eadatasize, NULL);
534 if (eadata == NULL) {
535 CERROR ("Missing/short eadata\n");
538 /* We save the reply LOV EA in case we have to replay
539 * a create for recovery. If we didn't allocate a
540 * large enough request buffer above we need to
541 * reallocate it here to hold the actual LOV EA. */
542 if (it->it_op & IT_OPEN) {
543 int offset = DLM_INTENT_REC_OFF + 2;
546 if (lustre_msg_buflen(req->rq_reqmsg, offset) <
548 mdc_realloc_openmsg(req, body);
550 lmm = lustre_msg_buf(req->rq_reqmsg, offset,
553 memcpy(lmm, eadata, body->eadatasize);
561 /* We always reserve enough space in the reply packet for a stripe MD, because
562 * we don't know in advance the file type. */
563 int mdc_enqueue(struct obd_export *exp, struct ldlm_enqueue_info *einfo,
564 struct lookup_intent *it, struct mdc_op_data *data,
565 struct lustre_handle *lockh, void *lmm, int lmmsize,
566 int extra_lock_flags)
568 struct ptlrpc_request *req;
569 struct obd_device *obddev = class_exp2obd(exp);
570 struct ldlm_res_id res_id;
571 ldlm_policy_data_t policy = { .l_inodebits = { MDS_INODELOCK_LOOKUP } };
572 int flags = extra_lock_flags | LDLM_FL_HAS_INTENT;
576 fid_build_reg_res_name((void *)&data->fid1, &res_id);
577 LASSERTF(einfo->ei_type == LDLM_IBITS,"lock type %d\n", einfo->ei_type);
578 if (it->it_op & (IT_UNLINK | IT_GETATTR | IT_READDIR))
579 policy.l_inodebits.bits = MDS_INODELOCK_UPDATE;
581 if (it->it_op & IT_OPEN) {
582 if ((it->it_op & IT_CREAT) && mdc_exp_is_2_0_server(exp)) {
583 struct client_obd *cli = &obddev->u.cli;
584 data->fid3 = data->fid2;
585 rc = mdc_fid_alloc(cli->cl_seq, (void *)&data->fid2);
587 CERROR("fid allocation result: %d\n", rc);
591 req = mdc_intent_open_pack(exp, it, data, lmm, lmmsize);
592 if (it->it_flags & O_JOIN_FILE) {
593 policy.l_inodebits.bits = MDS_INODELOCK_UPDATE;
595 } else if (it->it_op & IT_UNLINK) {
596 req = mdc_intent_unlink_pack(exp, it, data);
597 } else if (it->it_op & (IT_GETATTR | IT_LOOKUP)) {
598 req = mdc_intent_lookup_pack(exp, it, data);
599 } else if (it->it_op == IT_READDIR) {
600 req = mdc_intent_readdir_pack(exp);
602 CERROR("bad it_op %x\n", it->it_op);
609 /* It is important to obtain rpc_lock first (if applicable), so that
610 * threads that are serialised with rpc_lock are not polluting our
611 * rpcs in flight counter */
612 mdc_get_rpc_lock(obddev->u.cli.cl_rpc_lock, it);
613 mdc_enter_request(&obddev->u.cli);
614 rc = ldlm_cli_enqueue(exp, &req, einfo, res_id, &policy, &flags, NULL,
616 mdc_exit_request(&obddev->u.cli);
617 mdc_put_rpc_lock(obddev->u.cli.cl_rpc_lock, it);
619 CERROR("ldlm_cli_enqueue: %d\n", rc);
620 mdc_clear_replay_flag(req, rc);
621 ptlrpc_req_finished(req);
624 rc = mdc_finish_enqueue(exp, req, einfo, it, lockh, rc);
628 EXPORT_SYMBOL(mdc_enqueue);
630 int mdc_revalidate_lock(struct obd_export *exp, struct lookup_intent *it,
633 /* We could just return 1 immediately, but since we should only
634 * be called in revalidate_it if we already have a lock, let's
636 struct ldlm_res_id res_id;
637 struct lustre_handle lockh;
638 ldlm_policy_data_t policy;
642 fid_build_reg_res_name((struct lu_fid*)fid, &res_id);
643 /* As not all attributes are kept under update lock, e.g.
644 owner/group/acls are under lookup lock, we need both
645 ibits for GETATTR. */
646 policy.l_inodebits.bits = (it->it_op == IT_GETATTR) ?
647 MDS_INODELOCK_UPDATE | MDS_INODELOCK_LOOKUP :
648 MDS_INODELOCK_LOOKUP;
650 mode = ldlm_lock_match(exp->exp_obd->obd_namespace,
651 LDLM_FL_BLOCK_GRANTED, &res_id, LDLM_IBITS,
652 &policy, LCK_CR|LCK_CW|LCK_PR|LCK_PW, &lockh);
654 memcpy(&it->d.lustre.it_lock_handle, &lockh, sizeof(lockh));
655 it->d.lustre.it_lock_mode = mode;
660 EXPORT_SYMBOL(mdc_revalidate_lock);
662 static int mdc_finish_intent_lock(struct obd_export *exp,
663 struct ptlrpc_request *req,
664 struct mdc_op_data *data,
665 struct lookup_intent *it,
666 struct lustre_handle *lockh)
668 struct mds_body *mds_body;
669 struct lustre_handle old_lock;
670 struct ldlm_lock *lock;
674 LASSERT(req != NULL);
675 LASSERT(req != LP_POISON);
676 LASSERT(req->rq_repmsg != LP_POISON);
678 if (!it_disposition(it, DISP_IT_EXECD)) {
679 /* The server failed before it even started executing the
680 * intent, i.e. because it couldn't unpack the request. */
681 LASSERT(it->d.lustre.it_status != 0);
682 RETURN(it->d.lustre.it_status);
684 rc = it_open_error(DISP_IT_EXECD, it);
688 mds_body = lustre_msg_buf(req->rq_repmsg, DLM_REPLY_REC_OFF,
690 /* mdc_enqueue checked */
691 LASSERT(mds_body != NULL);
692 /* mdc_enqueue swabbed */
693 LASSERT(lustre_rep_swabbed(req, DLM_REPLY_REC_OFF));
695 /* If we were revalidating a fid/name pair, mark the intent in
696 * case we fail and get called again from lookup */
698 if (data->fid2.id && (it->it_op != IT_GETATTR) &&
699 ( !mdc_exp_is_2_0_server(exp) ||
700 (mdc_exp_is_2_0_server(exp) && (it->it_flags & O_CHECK_STALE)))) {
701 it_set_disposition(it, DISP_ENQ_COMPLETE);
703 /* Also: did we find the same inode? */
704 if (memcmp(&data->fid2, &mds_body->fid1, sizeof(data->fid2)) &&
705 memcmp(&data->fid3, &mds_body->fid1, sizeof(data->fid3)))
709 rc = it_open_error(DISP_LOOKUP_EXECD, it);
713 /* keep requests around for the multiple phases of the call
714 * this shows the DISP_XX must guarantee we make it into the call
716 if (!it_disposition(it, DISP_ENQ_CREATE_REF) &&
717 it_disposition(it, DISP_OPEN_CREATE) &&
718 !it_open_error(DISP_OPEN_CREATE, it)) {
719 it_set_disposition(it, DISP_ENQ_CREATE_REF);
720 ptlrpc_request_addref(req); /* balanced in ll_create_node */
722 if (!it_disposition(it, DISP_ENQ_OPEN_REF) &&
723 it_disposition(it, DISP_OPEN_OPEN) &&
724 !it_open_error(DISP_OPEN_OPEN, it)) {
725 it_set_disposition(it, DISP_ENQ_OPEN_REF);
726 ptlrpc_request_addref(req); /* balanced in ll_file_open */
727 /* BUG 11546 - eviction in the middle of open rpc processing */
728 OBD_FAIL_TIMEOUT(OBD_FAIL_MDC_ENQUEUE_PAUSE, obd_timeout);
731 if (it->it_op & IT_CREAT) {
732 /* XXX this belongs in ll_create_it */
733 } else if (it->it_op == IT_OPEN) {
734 LASSERT(!it_disposition(it, DISP_OPEN_CREATE));
736 LASSERT(it->it_op & (IT_GETATTR | IT_LOOKUP));
739 /* If we already have a matching lock, then cancel the new
740 * one. We have to set the data here instead of in
741 * mdc_enqueue, because we need to use the child's inode as
742 * the l_ast_data to match, and that's not available until
743 * intent_finish has performed the iget().) */
744 lock = ldlm_handle2lock(lockh);
746 ldlm_policy_data_t policy = lock->l_policy_data;
748 LDLM_DEBUG(lock, "matching against this");
750 memcpy(&old_lock, lockh, sizeof(*lockh));
751 if (ldlm_lock_match(NULL, LDLM_FL_BLOCK_GRANTED, NULL,
752 LDLM_IBITS, &policy, LCK_NL, &old_lock)) {
753 ldlm_lock_decref_and_cancel(lockh,
754 it->d.lustre.it_lock_mode);
755 memcpy(lockh, &old_lock, sizeof(old_lock));
756 memcpy(&it->d.lustre.it_lock_handle, lockh,
761 CDEBUG(D_DENTRY,"D_IT dentry %.*s intent: %s status %d disp %x rc %d\n",
762 data->namelen, data->name, ldlm_it2str(it->it_op),
763 it->d.lustre.it_status, it->d.lustre.it_disposition, rc);
768 * This long block is all about fixing up the lock and request state
769 * so that it is correct as of the moment _before_ the operation was
770 * applied; that way, the VFS will think that everything is normal and
771 * call Lustre's regular VFS methods.
773 * If we're performing a creation, that means that unless the creation
774 * failed with EEXIST, we should fake up a negative dentry.
776 * For everything else, we want to lookup to succeed.
778 * One additional note: if CREATE or OPEN succeeded, we add an extra
779 * reference to the request because we need to keep it around until
780 * ll_create/ll_open gets called.
782 * The server will return to us, in it_disposition, an indication of
783 * exactly what d.lustre.it_status refers to.
785 * If DISP_OPEN_OPEN is set, then d.lustre.it_status refers to the open() call,
786 * otherwise if DISP_OPEN_CREATE is set, then it status is the
787 * creation failure mode. In either case, one of DISP_LOOKUP_NEG or
788 * DISP_LOOKUP_POS will be set, indicating whether the child lookup
791 * Else, if DISP_LOOKUP_EXECD then d.lustre.it_status is the rc of the
794 int mdc_intent_lock(struct obd_export *exp, struct mdc_op_data *op_data,
795 void *lmm, int lmmsize, struct lookup_intent *it,
796 int lookup_flags, struct ptlrpc_request **reqp,
797 ldlm_blocking_callback cb_blocking, int extra_lock_flags)
799 struct lustre_handle lockh;
805 CDEBUG(D_DLMTRACE,"name: %.*s("DFID") in inode ("DFID"), "
806 "intent: %s flags %#o\n",
807 op_data->namelen, op_data->name,
808 PFID(((void *)&op_data->fid2)),
809 PFID(((void *)&op_data->fid1)),
810 ldlm_it2str(it->it_op), it->it_flags);
813 if (op_data->fid2.id &&
814 (it->it_op == IT_LOOKUP || it->it_op == IT_GETATTR)) {
815 rc = mdc_revalidate_lock(exp, it, &op_data->fid2);
816 /* Only return failure if it was not GETATTR by cfid
817 (from inode_revalidate) */
818 if (rc || op_data->namelen != 0)
822 /* lookup_it may be called only after revalidate_it has run, because
823 * revalidate_it cannot return errors, only zero. Returning zero causes
824 * this call to lookup, which *can* return an error.
826 * We only want to execute the request associated with the intent one
827 * time, however, so don't send the request again. Instead, skip past
828 * this and use the request from revalidate. In this case, revalidate
829 * never dropped its reference, so the refcounts are all OK */
830 if (!it_disposition(it, DISP_ENQ_COMPLETE)) {
831 struct ldlm_enqueue_info einfo =
832 { LDLM_IBITS, it_to_lock_mode(it), cb_blocking,
833 ldlm_completion_ast, NULL, NULL };
835 rc = mdc_enqueue(exp, &einfo, it, op_data, &lockh,
836 lmm, lmmsize, extra_lock_flags);
839 memcpy(&it->d.lustre.it_lock_handle, &lockh, sizeof(lockh));
840 } else if (!op_data->fid2.id) {
841 /* DISP_ENQ_COMPLETE set means there is extra reference on
842 * request referenced from this intent, saved for subsequent
843 * lookup. This path is executed when we proceed to this
844 * lookup, so we clear DISP_ENQ_COMPLETE */
845 it_clear_disposition(it, DISP_ENQ_COMPLETE);
848 *reqp = it->d.lustre.it_data;
849 rc = mdc_finish_intent_lock(exp, *reqp, op_data, it, &lockh);
853 EXPORT_SYMBOL(mdc_intent_lock);
855 static int mdc_intent_getattr_async_interpret(struct ptlrpc_request *req,
856 void *unused, int rc)
858 struct obd_export *exp = req->rq_async_args.pointer_arg[0];
859 struct md_enqueue_info *minfo = req->rq_async_args.pointer_arg[1];
860 struct ldlm_enqueue_info *einfo = req->rq_async_args.pointer_arg[2];
861 struct lookup_intent *it;
862 struct lustre_handle *lockh;
863 struct obd_device *obddev;
864 int flags = LDLM_FL_HAS_INTENT;
868 lockh = &minfo->mi_lockh;
870 obddev = class_exp2obd(exp);
872 mdc_exit_request(&obddev->u.cli);
873 if (OBD_FAIL_CHECK(OBD_FAIL_MDC_GETATTR_ENQUEUE))
876 rc = ldlm_cli_enqueue_fini(exp, req, einfo->ei_type, 1, einfo->ei_mode,
877 &flags, NULL, 0, NULL, lockh, rc);
879 CERROR("ldlm_cli_enqueue_fini: %d\n", rc);
880 mdc_clear_replay_flag(req, rc);
884 rc = mdc_finish_enqueue(exp, req, einfo, it, lockh, rc);
888 memcpy(&it->d.lustre.it_lock_handle, lockh, sizeof(*lockh));
890 rc = mdc_finish_intent_lock(exp, req, &minfo->mi_data, it, lockh);
894 minfo->mi_cb(exp, req, minfo, rc);
899 int mdc_intent_getattr_async(struct obd_export *exp,
900 struct md_enqueue_info *minfo,
901 struct ldlm_enqueue_info *einfo)
903 struct mdc_op_data *op_data = &minfo->mi_data;
904 struct lookup_intent *it = &minfo->mi_it;
905 struct ptlrpc_request *req;
906 struct obd_device *obddev = class_exp2obd(exp);
907 struct ldlm_res_id res_id;
908 ldlm_policy_data_t policy = {
909 .l_inodebits = { MDS_INODELOCK_LOOKUP }
912 int flags = LDLM_FL_HAS_INTENT;
915 CDEBUG(D_DLMTRACE,"name: %.*s in inode "LPU64", intent: %s flags %#o\n",
916 op_data->namelen, op_data->name, op_data->fid1.id,
917 ldlm_it2str(it->it_op), it->it_flags);
919 fid_build_reg_res_name((void *)&op_data->fid1, &res_id);
920 req = mdc_intent_lookup_pack(exp, it, op_data);
924 mdc_enter_request(&obddev->u.cli);
925 rc = ldlm_cli_enqueue(exp, &req, einfo, res_id, &policy, &flags, NULL,
926 0, NULL, &minfo->mi_lockh, 1);
928 mdc_exit_request(&obddev->u.cli);
932 req->rq_async_args.pointer_arg[0] = exp;
933 req->rq_async_args.pointer_arg[1] = minfo;
934 req->rq_async_args.pointer_arg[2] = einfo;
935 req->rq_interpret_reply = mdc_intent_getattr_async_interpret;
936 ptlrpcd_add_req(req);
940 EXPORT_SYMBOL(mdc_intent_getattr_async);