1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2 * vim:expandtab:shiftwidth=8:tabstop=8:
6 * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
8 * This program is free software; you can redistribute it and/or modify
9 * it under the terms of the GNU General Public License version 2 only,
10 * as published by the Free Software Foundation.
12 * This program is distributed in the hope that it will be useful, but
13 * WITHOUT ANY WARRANTY; without even the implied warranty of
14 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
15 * General Public License version 2 for more details (a copy is included
16 * in the LICENSE file that accompanied this code).
18 * You should have received a copy of the GNU General Public License
19 * version 2 along with this program; If not, see
20 * http://www.sun.com/software/products/lustre/docs/GPLv2.pdf
22 * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara,
23 * CA 95054 USA or visit www.sun.com if you need additional information or
29 * Copyright 2008 Sun Microsystems, Inc. All rights reserved
30 * Use is subject to license terms.
33 * This file is part of Lustre, http://www.lustre.org/
34 * Lustre is a trademark of Sun Microsystems, Inc.
38 # define EXPORT_SYMTAB
40 #define DEBUG_SUBSYSTEM S_MDC
43 # include <linux/module.h>
44 # include <linux/pagemap.h>
45 # include <linux/miscdevice.h>
46 # include <linux/init.h>
48 # include <liblustre.h>
51 #include <obd_class.h>
52 #include <lustre_dlm.h>
53 #include <lprocfs_status.h>
54 #include "mdc_internal.h"
56 int it_open_error(int phase, struct lookup_intent *it)
58 if (it_disposition(it, DISP_OPEN_OPEN)) {
59 if (phase >= DISP_OPEN_OPEN)
60 return it->d.lustre.it_status;
65 if (it_disposition(it, DISP_OPEN_CREATE)) {
66 if (phase >= DISP_OPEN_CREATE)
67 return it->d.lustre.it_status;
72 if (it_disposition(it, DISP_LOOKUP_EXECD)) {
73 if (phase >= DISP_LOOKUP_EXECD)
74 return it->d.lustre.it_status;
79 if (it_disposition(it, DISP_IT_EXECD)) {
80 if (phase >= DISP_IT_EXECD)
81 return it->d.lustre.it_status;
85 CERROR("it disp: %X, status: %d\n", it->d.lustre.it_disposition,
86 it->d.lustre.it_status);
90 EXPORT_SYMBOL(it_open_error);
92 /* this must be called on a lockh that is known to have a referenced lock */
93 void mdc_set_lock_data(__u64 *l, void *data, __u32 *bits)
95 struct ldlm_lock *lock;
96 struct lustre_handle *lockh = (struct lustre_handle *)l;
107 lock = ldlm_handle2lock(lockh);
109 LASSERT(lock != NULL);
110 lock_res_and_lock(lock);
112 if (lock->l_ast_data && lock->l_ast_data != data) {
113 struct inode *new_inode = data;
114 struct inode *old_inode = lock->l_ast_data;
115 LASSERTF(old_inode->i_state & I_FREEING,
116 "Found existing inode %p/%lu/%u state %lu in lock: "
117 "setting data to %p/%lu/%u\n", old_inode,
118 old_inode->i_ino, old_inode->i_generation,
120 new_inode, new_inode->i_ino, new_inode->i_generation);
123 lock->l_ast_data = data;
125 *bits = lock->l_policy_data.l_inodebits.bits;
126 unlock_res_and_lock(lock);
131 EXPORT_SYMBOL(mdc_set_lock_data);
133 int mdc_change_cbdata(struct obd_export *exp, struct ll_fid *fid,
134 ldlm_iterator_t it, void *data)
136 struct ldlm_res_id res_id;
139 fid_build_reg_res_name((struct lu_fid*)fid, &res_id);
140 ldlm_resource_iterate(class_exp2obd(exp)->obd_namespace, &res_id,
147 static inline void mdc_clear_replay_flag(struct ptlrpc_request *req, int rc)
149 /* Don't hold error requests for replay. */
150 if (req->rq_replay) {
151 spin_lock(&req->rq_lock);
153 spin_unlock(&req->rq_lock);
155 if (rc && req->rq_transno != 0) {
156 DEBUG_REQ(D_ERROR, req, "transno returned on error rc %d", rc);
161 static int round_up(int val)
171 /* Save a large LOV EA into the request buffer so that it is available
172 * for replay. We don't do this in the initial request because the
173 * original request doesn't need this buffer (at most it sends just the
174 * lov_mds_md) and it is a waste of RAM/bandwidth to send the empty
175 * buffer and may also be difficult to allocate and save a very large
176 * request buffer for each open. (bug 5707)
178 * OOM here may cause recovery failure if lmm is needed (only for the
179 * original open if the MDS crashed just when this client also OOM'd)
180 * but this is incredibly unlikely, and questionable whether the client
181 * could do MDS recovery under OOM anyways... */
182 static void mdc_realloc_openmsg(struct ptlrpc_request *req,
183 struct mds_body *body)
185 int old_len, new_size, old_size;
186 struct lustre_msg *old_msg = req->rq_reqmsg;
187 struct lustre_msg *new_msg;
190 if (mdc_req_is_2_0_server(req))
195 old_len = lustre_msg_buflen(old_msg, DLM_INTENT_REC_OFF + offset);
196 old_size = lustre_packed_msg_size(old_msg);
197 lustre_msg_set_buflen(old_msg, DLM_INTENT_REC_OFF + offset,
199 /* old buffer is more then need */
200 if (old_len > body->eadatasize)
203 new_size = lustre_packed_msg_size(old_msg);
205 OBD_ALLOC(new_msg, new_size);
206 if (new_msg != NULL) {
207 DEBUG_REQ(D_INFO, req, "replace reqmsg for larger EA %u",
209 memcpy(new_msg, old_msg, old_size);
211 spin_lock(&req->rq_lock);
212 req->rq_reqmsg = new_msg;
213 req->rq_reqlen = new_size;
214 spin_unlock(&req->rq_lock);
216 OBD_FREE(old_msg, old_size);
218 lustre_msg_set_buflen(old_msg,
219 DLM_INTENT_REC_OFF + offset, old_len);
220 body->valid &= ~OBD_MD_FLEASIZE;
221 body->eadatasize = 0;
225 static struct ptlrpc_request *mdc_intent_open_pack(struct obd_export *exp,
226 struct lookup_intent *it,
227 struct mdc_op_data *data,
228 void *lmm, __u32 lmmsize)
230 struct ptlrpc_request *req;
231 struct ldlm_intent *lit;
232 struct obd_device *obddev = class_exp2obd(exp);
233 __u32 size[9] = { [MSG_PTLRPC_BODY_OFF] = sizeof(struct ptlrpc_body),
234 [DLM_LOCKREQ_OFF] = sizeof(struct ldlm_request),
235 [DLM_INTENT_IT_OFF] = sizeof(*lit),
236 [DLM_INTENT_REC_OFF] = sizeof(struct mds_rec_create),
237 [DLM_INTENT_REC_OFF+1]= data->namelen + 1,
238 /* As an optimization, we allocate an RPC request buffer
239 * for at least a default-sized LOV EA even if we aren't
240 * sending one. We grow the whole request to the next
241 * power-of-two size since we get that much from a slab
242 * allocation anyways. This avoids an allocation below
243 * in the common case where we need to save a
244 * default-sized LOV EA for open replay. */
245 [DLM_INTENT_REC_OFF+2]= max(lmmsize,
246 obddev->u.cli.cl_default_mds_easize) };
247 __u32 repsize[7] = { [MSG_PTLRPC_BODY_OFF] = sizeof(struct ptlrpc_body),
248 [DLM_LOCKREPLY_OFF] = sizeof(struct ldlm_reply),
249 [DLM_REPLY_REC_OFF] = sizeof(struct mdt_body),
250 [DLM_REPLY_REC_OFF+1] = obddev->u.cli.
252 [DLM_REPLY_REC_OFF+2] = LUSTRE_POSIX_ACL_MAX_SIZE };
253 CFS_LIST_HEAD(cancels);
254 int do_join = (it->it_create_mode & M_JOIN_FILE) && data->data;
262 it->it_create_mode = (it->it_create_mode & ~S_IFMT) | S_IFREG;
263 if (mdc_exp_is_2_0_server(exp)) {
264 size[DLM_INTENT_REC_OFF] = sizeof(struct mdt_rec_create);
265 size[DLM_INTENT_REC_OFF+4] = size[DLM_INTENT_REC_OFF+2];
266 size[DLM_INTENT_REC_OFF+3] = size[DLM_INTENT_REC_OFF+1];
267 size[DLM_INTENT_REC_OFF+2] = 0; /* capa */
268 size[DLM_INTENT_REC_OFF+1] = 0; /* capa */
270 repsize[DLM_REPLY_REC_OFF+3]=sizeof(struct lustre_capa);
271 repsize[DLM_REPLY_REC_OFF+4]=sizeof(struct lustre_capa);
274 rc = lustre_msg_size(class_exp2cliimp(exp)->imp_msg_magic,
277 size[bufcount - 1] = min(size[bufcount - 1] + round_up(rc) - rc,
278 (__u32)obddev->u.cli.cl_max_mds_easize);
280 /* If inode is known, cancel conflicting OPEN locks. */
282 if (it->it_flags & (FMODE_WRITE|MDS_OPEN_TRUNC))
285 else if (it->it_flags & FMODE_EXEC)
290 count = mdc_resource_get_unused(exp, &data->fid2, &cancels,
291 mode, MDS_INODELOCK_OPEN);
294 /* If CREATE or JOIN_FILE, cancel parent's UPDATE lock. */
295 if (it->it_op & IT_CREAT || do_join)
299 count += mdc_resource_get_unused(exp, &data->fid1, &cancels, mode,
300 MDS_INODELOCK_UPDATE);
302 __u64 head_size = (*(__u64 *)data->data);
303 /* join is like an unlink of the tail */
304 if (mdc_exp_is_2_0_server(exp)) {
305 size[DLM_INTENT_REC_OFF+5]=sizeof(struct mdt_rec_join);
307 size[DLM_INTENT_REC_OFF+3]=sizeof(struct mds_rec_join);
311 req = ldlm_prep_enqueue_req(exp, bufcount, size,&cancels,count);
313 mdc_join_pack(req, bufcount - 1, data, head_size);
315 req = ldlm_prep_enqueue_req(exp, bufcount, size,&cancels,count);
316 it->it_create_mode &= ~M_JOIN_FILE;
320 spin_lock(&req->rq_lock);
321 req->rq_replay = req->rq_import->imp_replayable;
322 spin_unlock(&req->rq_lock);
324 /* pack the intent */
325 lit = lustre_msg_buf(req->rq_reqmsg, DLM_INTENT_IT_OFF,
327 lit->opc = (__u64)it->it_op;
329 /* pack the intended request */
330 mdc_open_pack(req, DLM_INTENT_REC_OFF, data,
331 it->it_create_mode, 0, it->it_flags,
334 ptlrpc_req_set_repsize(req, repbufcount, repsize);
339 static struct ptlrpc_request *mdc_intent_unlink_pack(struct obd_export *exp,
340 struct lookup_intent *it,
341 struct mdc_op_data *data)
343 struct ptlrpc_request *req;
344 struct ldlm_intent *lit;
345 struct obd_device *obddev = class_exp2obd(exp);
346 __u32 size[5] = { [MSG_PTLRPC_BODY_OFF] = sizeof(struct ptlrpc_body),
347 [DLM_LOCKREQ_OFF] = sizeof(struct ldlm_request),
348 [DLM_INTENT_IT_OFF] = sizeof(*lit),
349 [DLM_INTENT_REC_OFF] = mdc_exp_is_2_0_server(exp) ?
350 sizeof(struct mdt_rec_unlink) :
351 sizeof(struct mds_rec_unlink),
352 [DLM_INTENT_REC_OFF+1]= data->namelen + 1 };
353 __u32 repsize[5] = { [MSG_PTLRPC_BODY_OFF] = sizeof(struct ptlrpc_body),
354 [DLM_LOCKREPLY_OFF] = sizeof(struct ldlm_reply),
355 [DLM_REPLY_REC_OFF] = sizeof(struct mdt_body),
356 [DLM_REPLY_REC_OFF+1] = obddev->u.cli.
358 [DLM_REPLY_REC_OFF+2] = obddev->u.cli.
359 cl_max_mds_cookiesize };
362 req = ldlm_prep_enqueue_req(exp, 5, size, NULL, 0);
364 /* pack the intent */
365 lit = lustre_msg_buf(req->rq_reqmsg, DLM_INTENT_IT_OFF,
367 lit->opc = (__u64)it->it_op;
369 /* pack the intended request */
370 mdc_unlink_pack(req, DLM_INTENT_REC_OFF, data);
372 ptlrpc_req_set_repsize(req, 5, repsize);
377 static struct ptlrpc_request *mdc_intent_lookup_pack(struct obd_export *exp,
378 struct lookup_intent *it,
379 struct mdc_op_data *data)
381 struct ptlrpc_request *req;
382 struct ldlm_intent *lit;
383 struct obd_device *obddev = class_exp2obd(exp);
384 __u32 size[6] = { [MSG_PTLRPC_BODY_OFF] = sizeof(struct ptlrpc_body),
385 [DLM_LOCKREQ_OFF] = sizeof(struct ldlm_request),
386 [DLM_INTENT_IT_OFF] = sizeof(*lit),
387 [DLM_INTENT_REC_OFF] = sizeof(struct mdt_body),
388 [DLM_INTENT_REC_OFF+1]= data->namelen + 1,
389 [DLM_INTENT_REC_OFF+2]= 0 };
390 __u32 repsize[6] = { [MSG_PTLRPC_BODY_OFF] = sizeof(struct ptlrpc_body),
391 [DLM_LOCKREPLY_OFF] = sizeof(struct ldlm_reply),
392 [DLM_REPLY_REC_OFF] = sizeof(struct mdt_body),
393 [DLM_REPLY_REC_OFF+1] = obddev->u.cli.
395 [DLM_REPLY_REC_OFF+2] = LUSTRE_POSIX_ACL_MAX_SIZE,
396 [DLM_REPLY_REC_OFF+3] = 0 };
397 obd_valid valid = OBD_MD_FLGETATTR | OBD_MD_FLEASIZE | OBD_MD_FLACL |
398 OBD_MD_FLMODEASIZE | OBD_MD_FLDIREA;
402 if (mdc_exp_is_2_0_server(exp)) {
403 size[DLM_INTENT_REC_OFF+1] = 0; /* capa */
404 size[DLM_INTENT_REC_OFF+2] = data->namelen + 1;
407 req = ldlm_prep_enqueue_req(exp, bufcount, size, NULL, 0);
409 /* pack the intent */
410 lit = lustre_msg_buf(req->rq_reqmsg, DLM_INTENT_IT_OFF,
412 lit->opc = (__u64)it->it_op;
414 /* pack the intended request */
415 mdc_getattr_pack(req, DLM_INTENT_REC_OFF, valid, it->it_flags,
417 ptlrpc_req_set_repsize(req, bufcount, repsize);
422 static struct ptlrpc_request *mdc_intent_readdir_pack(struct obd_export *exp)
424 struct ptlrpc_request *req;
425 __u32 size[2] = { [MSG_PTLRPC_BODY_OFF] = sizeof(struct ptlrpc_body),
426 [DLM_LOCKREQ_OFF] = sizeof(struct ldlm_request) };
427 __u32 repsize[3] = { [MSG_PTLRPC_BODY_OFF] = sizeof(struct ptlrpc_body),
428 [DLM_LOCKREPLY_OFF] = sizeof(struct ldlm_reply),
429 [DLM_REPLY_REC_OFF] = sizeof(struct ost_lvb) };
432 req = ldlm_prep_enqueue_req(exp, 2, size, NULL, 0);
434 ptlrpc_req_set_repsize(req, 3, repsize);
438 static int mdc_finish_enqueue(struct obd_export *exp,
439 struct ptlrpc_request *req,
440 struct ldlm_enqueue_info *einfo,
441 struct lookup_intent *it,
442 struct lustre_handle *lockh,
445 struct ldlm_request *lockreq;
446 struct ldlm_reply *lockrep;
450 /* Similarly, if we're going to replay this request, we don't want to
451 * actually get a lock, just perform the intent. */
452 if (req->rq_transno || req->rq_replay) {
453 lockreq = lustre_msg_buf(req->rq_reqmsg, DLM_LOCKREQ_OFF,
455 lockreq->lock_flags |= LDLM_FL_INTENT_ONLY;
458 if (rc == ELDLM_LOCK_ABORTED) {
460 memset(lockh, 0, sizeof(*lockh));
462 } else { /* rc = 0 */
463 struct ldlm_lock *lock = ldlm_handle2lock(lockh);
466 /* If the server gave us back a different lock mode, we should
467 * fix up our variables. */
468 if (lock->l_req_mode != einfo->ei_mode) {
469 ldlm_lock_addref(lockh, lock->l_req_mode);
470 ldlm_lock_decref(lockh, einfo->ei_mode);
471 einfo->ei_mode = lock->l_req_mode;
476 lockrep = lustre_msg_buf(req->rq_repmsg, DLM_LOCKREPLY_OFF,
478 LASSERT(lockrep != NULL); /* checked by ldlm_cli_enqueue() */
479 /* swabbed by ldlm_cli_enqueue() */
480 LASSERT(lustre_rep_swabbed(req, DLM_LOCKREPLY_OFF));
482 it->d.lustre.it_disposition = (int)lockrep->lock_policy_res1;
483 it->d.lustre.it_status = (int)lockrep->lock_policy_res2;
484 it->d.lustre.it_lock_mode = einfo->ei_mode;
485 it->d.lustre.it_lock_handle = lockh->cookie;
486 it->d.lustre.it_data = req;
488 if (it->d.lustre.it_status < 0 && req->rq_replay)
489 mdc_clear_replay_flag(req, it->d.lustre.it_status);
491 /* If we're doing an IT_OPEN which did not result in an actual
492 * successful open, then we need to remove the bit which saves
493 * this request for unconditional replay.
495 * It's important that we do this first! Otherwise we might exit the
496 * function without doing so, and try to replay a failed create
498 if ((it->it_op & IT_OPEN) &&
500 (!it_disposition(it, DISP_OPEN_OPEN) ||
501 it->d.lustre.it_status != 0))
502 mdc_clear_replay_flag(req, it->d.lustre.it_status);
504 DEBUG_REQ(D_RPCTRACE, req, "op: %d disposition: %x, status: %d",
505 it->it_op,it->d.lustre.it_disposition,it->d.lustre.it_status);
507 /* We know what to expect, so we do any byte flipping required here */
508 if (it->it_op & (IT_OPEN | IT_UNLINK | IT_LOOKUP | IT_GETATTR)) {
509 struct mds_body *body;
511 body = lustre_swab_repbuf(req, DLM_REPLY_REC_OFF, sizeof(*body),
512 lustre_swab_mds_body);
514 CERROR ("Can't swab mds_body\n");
518 /* If this is a successful OPEN request, we need to set
519 replay handler and data early, so that if replay happens
520 immediately after swabbing below, new reply is swabbed
521 by that handler correctly */
522 if (it_disposition(it, DISP_OPEN_OPEN) &&
523 !it_open_error(DISP_OPEN_OPEN, it))
524 mdc_set_open_replay_data(NULL, req);
526 if ((body->valid & OBD_MD_FLEASIZE) != 0) {
529 mdc_update_max_ea_from_body(exp, body);
531 /* The eadata is opaque; just check that it is there.
532 * Eventually, obd_unpackmd() will check the contents */
533 eadata = lustre_swab_repbuf(req, DLM_REPLY_REC_OFF + 1,
534 body->eadatasize, NULL);
535 if (eadata == NULL) {
536 CERROR ("Missing/short eadata\n");
539 /* We save the reply LOV EA in case we have to replay
540 * a create for recovery. If we didn't allocate a
541 * large enough request buffer above we need to
542 * reallocate it here to hold the actual LOV EA. */
543 if (it->it_op & IT_OPEN) {
544 int offset = DLM_INTENT_REC_OFF;
547 if (mdc_req_is_2_0_server(req))
552 if (lustre_msg_buflen(req->rq_reqmsg, offset) !=
554 mdc_realloc_openmsg(req, body);
556 lmm = lustre_msg_buf(req->rq_reqmsg, offset,
559 memcpy(lmm, eadata, body->eadatasize);
567 /* We always reserve enough space in the reply packet for a stripe MD, because
568 * we don't know in advance the file type. */
569 int mdc_enqueue(struct obd_export *exp, struct ldlm_enqueue_info *einfo,
570 struct lookup_intent *it, struct mdc_op_data *data,
571 struct lustre_handle *lockh, void *lmm, int lmmsize,
572 int extra_lock_flags)
574 struct ptlrpc_request *req;
575 struct obd_device *obddev = class_exp2obd(exp);
576 struct ldlm_res_id res_id;
577 ldlm_policy_data_t policy = { .l_inodebits = { MDS_INODELOCK_LOOKUP } };
578 int flags = extra_lock_flags | LDLM_FL_HAS_INTENT;
582 fid_build_reg_res_name((void *)&data->fid1, &res_id);
583 LASSERTF(einfo->ei_type == LDLM_IBITS,"lock type %d\n", einfo->ei_type);
584 if (it->it_op & (IT_UNLINK | IT_GETATTR | IT_READDIR))
585 policy.l_inodebits.bits = MDS_INODELOCK_UPDATE;
587 if (it->it_op & IT_OPEN) {
588 if ((it->it_op & IT_CREAT) && mdc_exp_is_2_0_server(exp)) {
589 struct client_obd *cli = &obddev->u.cli;
590 data->fid3 = data->fid2;
591 rc = mdc_fid_alloc(cli->cl_seq, (void *)&data->fid2);
593 CERROR("fid allocation result: %d\n", rc);
597 req = mdc_intent_open_pack(exp, it, data, lmm, lmmsize);
598 if (it->it_create_mode & M_JOIN_FILE) {
599 policy.l_inodebits.bits = MDS_INODELOCK_UPDATE;
601 } else if (it->it_op & IT_UNLINK) {
602 req = mdc_intent_unlink_pack(exp, it, data);
603 } else if (it->it_op & (IT_GETATTR | IT_LOOKUP)) {
604 req = mdc_intent_lookup_pack(exp, it, data);
605 } else if (it->it_op == IT_READDIR) {
606 req = mdc_intent_readdir_pack(exp);
608 CERROR("bad it_op %x\n", it->it_op);
615 /* It is important to obtain rpc_lock first (if applicable), so that
616 * threads that are serialised with rpc_lock are not polluting our
617 * rpcs in flight counter */
618 mdc_get_rpc_lock(obddev->u.cli.cl_rpc_lock, it);
619 mdc_enter_request(&obddev->u.cli);
620 rc = ldlm_cli_enqueue(exp, &req, einfo, res_id, &policy, &flags, NULL,
622 mdc_exit_request(&obddev->u.cli);
623 mdc_put_rpc_lock(obddev->u.cli.cl_rpc_lock, it);
625 CERROR("ldlm_cli_enqueue: %d\n", rc);
626 mdc_clear_replay_flag(req, rc);
627 ptlrpc_req_finished(req);
630 rc = mdc_finish_enqueue(exp, req, einfo, it, lockh, rc);
634 EXPORT_SYMBOL(mdc_enqueue);
636 int mdc_revalidate_lock(struct obd_export *exp, struct lookup_intent *it,
639 /* We could just return 1 immediately, but since we should only
640 * be called in revalidate_it if we already have a lock, let's
642 struct ldlm_res_id res_id;
643 struct lustre_handle lockh;
644 ldlm_policy_data_t policy;
648 fid_build_reg_res_name((struct lu_fid*)fid, &res_id);
649 /* As not all attributes are kept under update lock, e.g.
650 owner/group/acls are under lookup lock, we need both
651 ibits for GETATTR. */
652 policy.l_inodebits.bits = (it->it_op == IT_GETATTR) ?
653 MDS_INODELOCK_UPDATE | MDS_INODELOCK_LOOKUP :
654 MDS_INODELOCK_LOOKUP;
656 mode = ldlm_lock_match(exp->exp_obd->obd_namespace,
657 LDLM_FL_BLOCK_GRANTED, &res_id, LDLM_IBITS,
658 &policy, LCK_CR|LCK_CW|LCK_PR|LCK_PW, &lockh);
660 memcpy(&it->d.lustre.it_lock_handle, &lockh, sizeof(lockh));
661 it->d.lustre.it_lock_mode = mode;
666 EXPORT_SYMBOL(mdc_revalidate_lock);
668 static int mdc_finish_intent_lock(struct obd_export *exp,
669 struct ptlrpc_request *req,
670 struct mdc_op_data *data,
671 struct lookup_intent *it,
672 struct lustre_handle *lockh)
674 struct mds_body *mds_body;
675 struct lustre_handle old_lock;
676 struct ldlm_lock *lock;
680 LASSERT(req != NULL);
681 LASSERT(req != LP_POISON);
682 LASSERT(req->rq_repmsg != LP_POISON);
684 if (!it_disposition(it, DISP_IT_EXECD)) {
685 /* The server failed before it even started executing the
686 * intent, i.e. because it couldn't unpack the request. */
687 LASSERT(it->d.lustre.it_status != 0);
688 RETURN(it->d.lustre.it_status);
690 rc = it_open_error(DISP_IT_EXECD, it);
694 mds_body = lustre_msg_buf(req->rq_repmsg, DLM_REPLY_REC_OFF,
696 /* mdc_enqueue checked */
697 LASSERT(mds_body != NULL);
698 /* mdc_enqueue swabbed */
699 LASSERT(lustre_rep_swabbed(req, DLM_REPLY_REC_OFF));
701 /* If we were revalidating a fid/name pair, mark the intent in
702 * case we fail and get called again from lookup */
704 if (data->fid2.id && (it->it_op != IT_GETATTR) &&
705 ( !mdc_exp_is_2_0_server(exp) ||
706 (mdc_exp_is_2_0_server(exp) && (it->it_create_mode & M_CHECK_STALE)))) {
707 it_set_disposition(it, DISP_ENQ_COMPLETE);
709 /* Also: did we find the same inode? */
710 if (memcmp(&data->fid2, &mds_body->fid1, sizeof(data->fid2)) &&
711 memcmp(&data->fid3, &mds_body->fid1, sizeof(data->fid3)))
715 rc = it_open_error(DISP_LOOKUP_EXECD, it);
719 /* keep requests around for the multiple phases of the call
720 * this shows the DISP_XX must guarantee we make it into the call
722 if (!it_disposition(it, DISP_ENQ_CREATE_REF) &&
723 it_disposition(it, DISP_OPEN_CREATE) &&
724 !it_open_error(DISP_OPEN_CREATE, it)) {
725 it_set_disposition(it, DISP_ENQ_CREATE_REF);
726 ptlrpc_request_addref(req); /* balanced in ll_create_node */
728 if (!it_disposition(it, DISP_ENQ_OPEN_REF) &&
729 it_disposition(it, DISP_OPEN_OPEN) &&
730 !it_open_error(DISP_OPEN_OPEN, it)) {
731 it_set_disposition(it, DISP_ENQ_OPEN_REF);
732 ptlrpc_request_addref(req); /* balanced in ll_file_open */
733 /* BUG 11546 - eviction in the middle of open rpc processing */
734 OBD_FAIL_TIMEOUT(OBD_FAIL_MDC_ENQUEUE_PAUSE, obd_timeout);
737 if (it->it_op & IT_CREAT) {
738 /* XXX this belongs in ll_create_it */
739 } else if (it->it_op == IT_OPEN) {
740 LASSERT(!it_disposition(it, DISP_OPEN_CREATE));
742 LASSERT(it->it_op & (IT_GETATTR | IT_LOOKUP));
745 /* If we already have a matching lock, then cancel the new
746 * one. We have to set the data here instead of in
747 * mdc_enqueue, because we need to use the child's inode as
748 * the l_ast_data to match, and that's not available until
749 * intent_finish has performed the iget().) */
750 lock = ldlm_handle2lock(lockh);
752 ldlm_policy_data_t policy = lock->l_policy_data;
754 LDLM_DEBUG(lock, "matching against this");
756 memcpy(&old_lock, lockh, sizeof(*lockh));
757 if (ldlm_lock_match(NULL, LDLM_FL_BLOCK_GRANTED, NULL,
758 LDLM_IBITS, &policy, LCK_NL, &old_lock)) {
759 ldlm_lock_decref_and_cancel(lockh,
760 it->d.lustre.it_lock_mode);
761 memcpy(lockh, &old_lock, sizeof(old_lock));
762 memcpy(&it->d.lustre.it_lock_handle, lockh,
767 CDEBUG(D_DENTRY,"D_IT dentry %.*s intent: %s status %d disp %x rc %d\n",
768 data->namelen, data->name, ldlm_it2str(it->it_op),
769 it->d.lustre.it_status, it->d.lustre.it_disposition, rc);
774 * This long block is all about fixing up the lock and request state
775 * so that it is correct as of the moment _before_ the operation was
776 * applied; that way, the VFS will think that everything is normal and
777 * call Lustre's regular VFS methods.
779 * If we're performing a creation, that means that unless the creation
780 * failed with EEXIST, we should fake up a negative dentry.
782 * For everything else, we want to lookup to succeed.
784 * One additional note: if CREATE or OPEN succeeded, we add an extra
785 * reference to the request because we need to keep it around until
786 * ll_create/ll_open gets called.
788 * The server will return to us, in it_disposition, an indication of
789 * exactly what d.lustre.it_status refers to.
791 * If DISP_OPEN_OPEN is set, then d.lustre.it_status refers to the open() call,
792 * otherwise if DISP_OPEN_CREATE is set, then it status is the
793 * creation failure mode. In either case, one of DISP_LOOKUP_NEG or
794 * DISP_LOOKUP_POS will be set, indicating whether the child lookup
797 * Else, if DISP_LOOKUP_EXECD then d.lustre.it_status is the rc of the
800 int mdc_intent_lock(struct obd_export *exp, struct mdc_op_data *op_data,
801 void *lmm, int lmmsize, struct lookup_intent *it,
802 int lookup_flags, struct ptlrpc_request **reqp,
803 ldlm_blocking_callback cb_blocking, int extra_lock_flags)
805 struct lustre_handle lockh;
811 CDEBUG(D_DLMTRACE,"name: %.*s("DFID") in inode ("DFID"), "
812 "intent: %s flags %#o\n",
813 op_data->namelen, op_data->name,
814 PFID(((void *)&op_data->fid2)),
815 PFID(((void *)&op_data->fid1)),
816 ldlm_it2str(it->it_op), it->it_flags);
819 if (op_data->fid2.id &&
820 (it->it_op == IT_LOOKUP || it->it_op == IT_GETATTR)) {
821 rc = mdc_revalidate_lock(exp, it, &op_data->fid2);
822 /* Only return failure if it was not GETATTR by cfid
823 (from inode_revalidate) */
824 if (rc || op_data->namelen != 0)
828 /* lookup_it may be called only after revalidate_it has run, because
829 * revalidate_it cannot return errors, only zero. Returning zero causes
830 * this call to lookup, which *can* return an error.
832 * We only want to execute the request associated with the intent one
833 * time, however, so don't send the request again. Instead, skip past
834 * this and use the request from revalidate. In this case, revalidate
835 * never dropped its reference, so the refcounts are all OK */
836 if (!it_disposition(it, DISP_ENQ_COMPLETE)) {
837 struct ldlm_enqueue_info einfo =
838 { LDLM_IBITS, it_to_lock_mode(it), cb_blocking,
839 ldlm_completion_ast, NULL, NULL };
841 rc = mdc_enqueue(exp, &einfo, it, op_data, &lockh,
842 lmm, lmmsize, extra_lock_flags);
845 } else if (!op_data->fid2.id) {
846 /* DISP_ENQ_COMPLETE set means there is extra reference on
847 * request referenced from this intent, saved for subsequent
848 * lookup. This path is executed when we proceed to this
849 * lookup, so we clear DISP_ENQ_COMPLETE */
850 it_clear_disposition(it, DISP_ENQ_COMPLETE);
853 *reqp = it->d.lustre.it_data;
854 rc = mdc_finish_intent_lock(exp, *reqp, op_data, it, &lockh);
858 EXPORT_SYMBOL(mdc_intent_lock);
860 static int mdc_intent_getattr_async_interpret(struct ptlrpc_request *req,
861 void *unused, int rc)
863 struct obd_export *exp = req->rq_async_args.pointer_arg[0];
864 struct md_enqueue_info *minfo = req->rq_async_args.pointer_arg[1];
865 struct ldlm_enqueue_info *einfo = req->rq_async_args.pointer_arg[2];
866 struct lookup_intent *it;
867 struct lustre_handle *lockh;
868 struct obd_device *obddev;
869 int flags = LDLM_FL_HAS_INTENT;
873 lockh = &minfo->mi_lockh;
875 obddev = class_exp2obd(exp);
877 mdc_exit_request(&obddev->u.cli);
878 if (OBD_FAIL_CHECK(OBD_FAIL_MDC_GETATTR_ENQUEUE))
881 rc = ldlm_cli_enqueue_fini(exp, req, einfo->ei_type, 1, einfo->ei_mode,
882 &flags, NULL, 0, NULL, lockh, rc);
884 CERROR("ldlm_cli_enqueue_fini: %d\n", rc);
885 mdc_clear_replay_flag(req, rc);
889 rc = mdc_finish_enqueue(exp, req, einfo, it, lockh, rc);
893 rc = mdc_finish_intent_lock(exp, req, &minfo->mi_data, it, lockh);
897 minfo->mi_cb(exp, req, minfo, rc);
902 int mdc_intent_getattr_async(struct obd_export *exp,
903 struct md_enqueue_info *minfo,
904 struct ldlm_enqueue_info *einfo)
906 struct mdc_op_data *op_data = &minfo->mi_data;
907 struct lookup_intent *it = &minfo->mi_it;
908 struct ptlrpc_request *req;
909 struct obd_device *obddev = class_exp2obd(exp);
910 struct ldlm_res_id res_id;
911 ldlm_policy_data_t policy = {
912 .l_inodebits = { MDS_INODELOCK_LOOKUP }
915 int flags = LDLM_FL_HAS_INTENT;
918 CDEBUG(D_DLMTRACE,"name: %.*s in inode "LPU64", intent: %s flags %#o\n",
919 op_data->namelen, op_data->name, op_data->fid1.id,
920 ldlm_it2str(it->it_op), it->it_flags);
922 fid_build_reg_res_name((void *)&op_data->fid1, &res_id);
923 req = mdc_intent_lookup_pack(exp, it, op_data);
927 mdc_enter_request(&obddev->u.cli);
928 rc = ldlm_cli_enqueue(exp, &req, einfo, res_id, &policy, &flags, NULL,
929 0, NULL, &minfo->mi_lockh, 1);
931 mdc_exit_request(&obddev->u.cli);
935 req->rq_async_args.pointer_arg[0] = exp;
936 req->rq_async_args.pointer_arg[1] = minfo;
937 req->rq_async_args.pointer_arg[2] = einfo;
938 req->rq_interpret_reply = mdc_intent_getattr_async_interpret;
939 ptlrpcd_add_req(req);
943 EXPORT_SYMBOL(mdc_intent_getattr_async);