Whamcloud - gitweb
LU-313 tests: re-enable lfsck test to run by default
[fs/lustre-release.git] / lustre / mdc / mdc_locks.c
1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2  * vim:expandtab:shiftwidth=8:tabstop=8:
3  *
4  * GPL HEADER START
5  *
6  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
7  *
8  * This program is free software; you can redistribute it and/or modify
9  * it under the terms of the GNU General Public License version 2 only,
10  * as published by the Free Software Foundation.
11  *
12  * This program is distributed in the hope that it will be useful, but
13  * WITHOUT ANY WARRANTY; without even the implied warranty of
14  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
15  * General Public License version 2 for more details (a copy is included
16  * in the LICENSE file that accompanied this code).
17  *
18  * You should have received a copy of the GNU General Public License
19  * version 2 along with this program; If not, see
20  * http://www.sun.com/software/products/lustre/docs/GPLv2.pdf
21  *
22  * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara,
23  * CA 95054 USA or visit www.sun.com if you need additional information or
24  * have any questions.
25  *
26  * GPL HEADER END
27  */
28 /*
29  * Copyright (c) 2003, 2010, Oracle and/or its affiliates. All rights reserved.
30  * Use is subject to license terms.
31  */
32 /*
33  * This file is part of Lustre, http://www.lustre.org/
34  * Lustre is a trademark of Sun Microsystems, Inc.
35  */
36
37 #ifndef EXPORT_SYMTAB
38 # define EXPORT_SYMTAB
39 #endif
40 #define DEBUG_SUBSYSTEM S_MDC
41
42 #ifdef __KERNEL__
43 # include <linux/module.h>
44 # include <linux/pagemap.h>
45 # include <linux/miscdevice.h>
46 # include <linux/init.h>
47 #else
48 # include <liblustre.h>
49 #endif
50
51 #include <obd_class.h>
52 #include <lustre_dlm.h>
53 #include <lprocfs_status.h>
54 #include "mdc_internal.h"
55
56 struct mdc_getattr_args {
57         struct obd_export           *ga_exp;
58         struct md_enqueue_info      *ga_minfo;
59         struct ldlm_enqueue_info    *ga_einfo;
60 };
61
62 int it_open_error(int phase, struct lookup_intent *it)
63 {
64         if (it_disposition(it, DISP_OPEN_OPEN)) {
65                 if (phase >= DISP_OPEN_OPEN)
66                         return it->d.lustre.it_status;
67                 else
68                         return 0;
69         }
70
71         if (it_disposition(it, DISP_OPEN_CREATE)) {
72                 if (phase >= DISP_OPEN_CREATE)
73                         return it->d.lustre.it_status;
74                 else
75                         return 0;
76         }
77
78         if (it_disposition(it, DISP_LOOKUP_EXECD)) {
79                 if (phase >= DISP_LOOKUP_EXECD)
80                         return it->d.lustre.it_status;
81                 else
82                         return 0;
83         }
84
85         if (it_disposition(it, DISP_IT_EXECD)) {
86                 if (phase >= DISP_IT_EXECD)
87                         return it->d.lustre.it_status;
88                 else
89                         return 0;
90         }
91         CERROR("it disp: %X, status: %d\n", it->d.lustre.it_disposition,
92                it->d.lustre.it_status);
93         LBUG();
94         return 0;
95 }
96 EXPORT_SYMBOL(it_open_error);
97
98 /* this must be called on a lockh that is known to have a referenced lock */
99 void mdc_set_lock_data(__u64 *l, void *data, __u32 *bits)
100 {
101         struct ldlm_lock *lock;
102         struct lustre_handle *lockh = (struct lustre_handle *)l;
103         ENTRY;
104
105         if(bits)
106                 *bits = 0;
107
108         if (!*l) {
109                 EXIT;
110                 return;
111         }
112
113         lock = ldlm_handle2lock(lockh);
114
115         LASSERT(lock != NULL);
116         lock_res_and_lock(lock);
117 #ifdef __KERNEL__
118         if (lock->l_ast_data && lock->l_ast_data != data) {
119                 struct inode *new_inode = data;
120                 struct inode *old_inode = lock->l_ast_data;
121                 LASSERTF(old_inode->i_state & I_FREEING,
122                          "Found existing inode %p/%lu/%u state %lu in lock: "
123                          "setting data to %p/%lu/%u\n", old_inode,
124                          old_inode->i_ino, old_inode->i_generation,
125                          old_inode->i_state,
126                          new_inode, new_inode->i_ino, new_inode->i_generation);
127         }
128 #endif
129         lock->l_ast_data = data;
130         if (bits)
131                 *bits = lock->l_policy_data.l_inodebits.bits;
132         unlock_res_and_lock(lock);
133         LDLM_LOCK_PUT(lock);
134
135         EXIT;
136 }
137 EXPORT_SYMBOL(mdc_set_lock_data);
138
139 int mdc_change_cbdata(struct obd_export *exp, struct ll_fid *fid, 
140                       ldlm_iterator_t it, void *data)
141 {
142         struct ldlm_res_id res_id;
143         ENTRY;
144
145         fid_build_reg_res_name((struct lu_fid*)fid, &res_id);
146         ldlm_resource_iterate(class_exp2obd(exp)->obd_namespace, &res_id,
147                               it, data);
148
149         EXIT;
150         return 0;
151 }
152
153 /* find any ldlm lock of the inode in mdc
154  * return 0    not find
155  *        1    find one
156  *      < 0    error */
157 int mdc_find_cbdata(struct obd_export *exp, struct ll_fid *fid,
158                     ldlm_iterator_t it, void *data)
159 {
160         struct ldlm_res_id res_id;
161         int rc = 0;
162         ENTRY;
163
164         fid_build_reg_res_name((struct lu_fid*)fid, &res_id);
165         rc = ldlm_resource_iterate(class_exp2obd(exp)->obd_namespace, &res_id,
166                                    it, data);
167         if (rc == LDLM_ITER_STOP)
168                 RETURN(1);
169         else if (rc == LDLM_ITER_CONTINUE)
170                 RETURN(0);
171         RETURN(rc);
172 }
173
174 static inline void mdc_clear_replay_flag(struct ptlrpc_request *req, int rc)
175 {
176         /* Don't hold error requests for replay. */
177         if (req->rq_replay) {
178                 spin_lock(&req->rq_lock);
179                 req->rq_replay = 0;
180                 spin_unlock(&req->rq_lock);
181         }
182         if (rc && req->rq_transno != 0) {
183                 DEBUG_REQ(D_ERROR, req, "transno returned on error rc %d", rc);
184                 LBUG();
185         }
186 }
187
188 static int l_round_up(int val)
189 {
190         int ret = 1;
191         while (val) {
192                 val >>= 1;
193                 ret <<= 1;
194         }
195         return ret;
196 }
197
198 /* Save a large LOV EA into the request buffer so that it is available
199  * for replay.  We don't do this in the initial request because the
200  * original request doesn't need this buffer (at most it sends just the
201  * lov_mds_md) and it is a waste of RAM/bandwidth to send the empty
202  * buffer and may also be difficult to allocate and save a very large
203  * request buffer for each open. (bug 5707)
204  *
205  * OOM here may cause recovery failure if lmm is needed (only for the
206  * original open if the MDS crashed just when this client also OOM'd)
207  * but this is incredibly unlikely, and questionable whether the client
208  * could do MDS recovery under OOM anyways... */
209 static void mdc_realloc_openmsg(struct ptlrpc_request *req,
210                                 struct mds_body *body)
211 {
212         int old_len, new_size, old_size;
213         struct lustre_msg *old_msg = req->rq_reqmsg;
214         struct lustre_msg *new_msg;
215         int offset;
216
217         if (mdc_req_is_2_0_server(req))
218                 offset = 4;
219         else
220                 offset = 2;
221
222         old_len = lustre_msg_buflen(old_msg, DLM_INTENT_REC_OFF + offset);
223         old_size = lustre_packed_msg_size(old_msg);
224         lustre_msg_set_buflen(old_msg, DLM_INTENT_REC_OFF + offset,
225                               body->eadatasize);
226         /* old buffer is more then need */
227         if (old_len > body->eadatasize)
228                 return;
229
230         new_size = lustre_packed_msg_size(old_msg);
231
232         OBD_ALLOC(new_msg, new_size);
233         if (new_msg != NULL) {
234                 DEBUG_REQ(D_INFO, req, "replace reqmsg for larger EA %u",
235                           body->eadatasize);
236                 memcpy(new_msg, old_msg, old_size);
237
238                 spin_lock(&req->rq_lock);
239                 req->rq_reqmsg = new_msg;
240                 req->rq_reqlen = new_size;
241                 spin_unlock(&req->rq_lock);
242
243                 OBD_FREE(old_msg, old_size);
244         } else {
245                 lustre_msg_set_buflen(old_msg,
246                                       DLM_INTENT_REC_OFF + offset, old_len);
247                 body->valid &= ~OBD_MD_FLEASIZE;
248                 body->eadatasize = 0;
249         }
250 }
251
252 static struct ptlrpc_request *mdc_intent_open_pack(struct obd_export *exp,
253                                                    struct lookup_intent *it,
254                                                    struct mdc_op_data *data,
255                                                    void *lmm, __u32 lmmsize)
256 {
257         struct ptlrpc_request *req;
258         struct ldlm_intent *lit;
259         struct obd_device *obddev = class_exp2obd(exp);
260         __u32 size[9] = { [MSG_PTLRPC_BODY_OFF] = sizeof(struct ptlrpc_body),
261                         [DLM_LOCKREQ_OFF]     = sizeof(struct ldlm_request),
262                         [DLM_INTENT_IT_OFF]   = sizeof(*lit),
263                         [DLM_INTENT_REC_OFF]  = sizeof(struct mds_rec_create),
264                         [DLM_INTENT_REC_OFF+1]= data->namelen + 1,
265                         /* As an optimization, we allocate an RPC request buffer
266                          * for at least a default-sized LOV EA even if we aren't
267                          * sending one.  We grow the whole request to the next
268                          * power-of-two size since we get that much from a slab
269                          * allocation anyways. This avoids an allocation below
270                          * in the common case where we need to save a
271                          * default-sized LOV EA for open replay. */
272                         [DLM_INTENT_REC_OFF+2]= max(lmmsize,
273                                          obddev->u.cli.cl_default_mds_easize) };
274         __u32 repsize[7] = { [MSG_PTLRPC_BODY_OFF] = sizeof(struct ptlrpc_body),
275                            [DLM_LOCKREPLY_OFF]   = sizeof(struct ldlm_reply),
276                            [DLM_REPLY_REC_OFF]   = sizeof(struct mdt_body),
277                            [DLM_REPLY_REC_OFF+1] = obddev->u.cli.
278                                                         cl_max_mds_easize,
279                            [DLM_REPLY_REC_OFF+2] = LUSTRE_POSIX_ACL_MAX_SIZE };
280         CFS_LIST_HEAD(cancels);
281         int do_join = (it->it_create_mode & M_JOIN_FILE) && data->data;
282         int count = 0;
283         int bufcount = 6;
284         int repbufcount = 5;
285         int mode;
286         int rc;
287         ENTRY;
288
289         it->it_create_mode = (it->it_create_mode & ~S_IFMT) | S_IFREG;
290         if (mdc_exp_is_2_0_server(exp)) {
291                 size[DLM_INTENT_REC_OFF] = sizeof(struct mdt_rec_create);
292                 size[DLM_INTENT_REC_OFF+4] = size[DLM_INTENT_REC_OFF+2];
293                 size[DLM_INTENT_REC_OFF+3] = size[DLM_INTENT_REC_OFF+1];
294                 size[DLM_INTENT_REC_OFF+2] = 0; /* capa */
295                 size[DLM_INTENT_REC_OFF+1] = 0; /* capa */
296                 bufcount = 8;
297                 repsize[DLM_REPLY_REC_OFF+3]=sizeof(struct lustre_capa);
298                 repsize[DLM_REPLY_REC_OFF+4]=sizeof(struct lustre_capa);
299                 repbufcount = 7;
300         }
301         rc = lustre_msg_size(class_exp2cliimp(exp)->imp_msg_magic,
302                              bufcount, size);
303         if (rc & (rc - 1))
304                 size[bufcount - 1] = min(size[bufcount - 1] +
305                                          l_round_up(rc) - rc,
306                                         (__u32)obddev->u.cli.cl_max_mds_easize);
307
308         /* If inode is known, cancel conflicting OPEN locks. */
309         if (data->fid2.id) {
310                 if (it->it_flags & (FMODE_WRITE|MDS_OPEN_TRUNC))
311                         mode = LCK_CW;
312 #ifdef FMODE_EXEC
313                 else if (it->it_flags & FMODE_EXEC)
314                         mode = LCK_PR;
315 #endif
316                 else
317                         mode = LCK_CR;
318                 count = mdc_resource_get_unused(exp, &data->fid2, &cancels,
319                                                 mode, MDS_INODELOCK_OPEN);
320         }
321
322         /* If CREATE or JOIN_FILE, cancel parent's UPDATE lock. */
323         if (it->it_op & IT_CREAT || do_join)
324                 mode = LCK_EX;
325         else
326                 mode = LCK_CR;
327         count += mdc_resource_get_unused(exp, &data->fid1, &cancels, mode,
328                                          MDS_INODELOCK_UPDATE);
329         if (do_join) {
330                 __u64 head_size = (*(__u64 *)data->data);
331                 /* join is like an unlink of the tail */
332                 if (mdc_exp_is_2_0_server(exp)) {
333                         size[DLM_INTENT_REC_OFF+5]=sizeof(struct mdt_rec_join);
334                 } else {
335                         size[DLM_INTENT_REC_OFF+3]=sizeof(struct mds_rec_join);
336                 }
337                 bufcount++;
338
339                 req = ldlm_prep_enqueue_req(exp, bufcount, size,&cancels,count);
340                 if (req)
341                         mdc_join_pack(req, bufcount - 1, data, head_size);
342         } else {
343                 req = ldlm_prep_enqueue_req(exp, bufcount, size,&cancels,count);
344                 it->it_create_mode &= ~M_JOIN_FILE;
345         }
346
347         if (req) {
348                 spin_lock(&req->rq_lock);
349                 req->rq_replay = req->rq_import->imp_replayable;
350                 spin_unlock(&req->rq_lock);
351
352                 /* pack the intent */
353                 lit = lustre_msg_buf(req->rq_reqmsg, DLM_INTENT_IT_OFF,
354                                      sizeof(*lit));
355                 lit->opc = (__u64)it->it_op;
356
357                 /* pack the intended request */
358                 mdc_open_pack(req, DLM_INTENT_REC_OFF, data,
359                               it->it_create_mode, 0, it->it_flags,
360                               lmm, lmmsize);
361
362                 ptlrpc_req_set_repsize(req, repbufcount, repsize);
363         }
364         RETURN(req);
365 }
366
367 static struct ptlrpc_request *mdc_intent_unlink_pack(struct obd_export *exp,
368                                                      struct lookup_intent *it,
369                                                      struct mdc_op_data *data)
370 {
371         struct ptlrpc_request *req;
372         struct ldlm_intent *lit;
373         struct obd_device *obddev = class_exp2obd(exp);
374         __u32 size[5] = { [MSG_PTLRPC_BODY_OFF] = sizeof(struct ptlrpc_body),
375                         [DLM_LOCKREQ_OFF]     = sizeof(struct ldlm_request),
376                         [DLM_INTENT_IT_OFF]   = sizeof(*lit),
377                         [DLM_INTENT_REC_OFF]  = mdc_exp_is_2_0_server(exp) ?
378                                                 sizeof(struct mdt_rec_unlink) :
379                                                 sizeof(struct mds_rec_unlink),
380                         [DLM_INTENT_REC_OFF+1]= data->namelen + 1 };
381         __u32 repsize[5] = { [MSG_PTLRPC_BODY_OFF] = sizeof(struct ptlrpc_body),
382                            [DLM_LOCKREPLY_OFF]   = sizeof(struct ldlm_reply),
383                            [DLM_REPLY_REC_OFF]   = sizeof(struct mdt_body),
384                            [DLM_REPLY_REC_OFF+1] = obddev->u.cli.
385                                                         cl_max_mds_easize,
386                            [DLM_REPLY_REC_OFF+2] = obddev->u.cli.
387                                                         cl_max_mds_cookiesize };
388         ENTRY;
389
390         req = ldlm_prep_enqueue_req(exp, 5, size, NULL, 0);
391         if (req) {
392                 /* pack the intent */
393                 lit = lustre_msg_buf(req->rq_reqmsg, DLM_INTENT_IT_OFF,
394                                      sizeof(*lit));
395                 lit->opc = (__u64)it->it_op;
396
397                 /* pack the intended request */
398                 mdc_unlink_pack(req, DLM_INTENT_REC_OFF, data);
399
400                 ptlrpc_req_set_repsize(req, 5, repsize);
401         }
402         RETURN(req);
403 }
404
405 static struct ptlrpc_request *mdc_intent_lookup_pack(struct obd_export *exp,
406                                                      struct lookup_intent *it,
407                                                      struct mdc_op_data *data)
408 {
409         struct ptlrpc_request *req;
410         struct ldlm_intent *lit;
411         struct obd_device *obddev = class_exp2obd(exp);
412         __u32 size[6] = { [MSG_PTLRPC_BODY_OFF] = sizeof(struct ptlrpc_body),
413                         [DLM_LOCKREQ_OFF]     = sizeof(struct ldlm_request),
414                         [DLM_INTENT_IT_OFF]   = sizeof(*lit),
415                         [DLM_INTENT_REC_OFF]  = sizeof(struct mdt_body),
416                         [DLM_INTENT_REC_OFF+1]= data->namelen + 1,
417                         [DLM_INTENT_REC_OFF+2]= 0 };
418         __u32 repsize[6] = { [MSG_PTLRPC_BODY_OFF] = sizeof(struct ptlrpc_body),
419                            [DLM_LOCKREPLY_OFF]   = sizeof(struct ldlm_reply),
420                            [DLM_REPLY_REC_OFF]   = sizeof(struct mdt_body),
421                            [DLM_REPLY_REC_OFF+1] = obddev->u.cli.
422                                                         cl_max_mds_easize,
423                            [DLM_REPLY_REC_OFF+2] = LUSTRE_POSIX_ACL_MAX_SIZE,
424                            [DLM_REPLY_REC_OFF+3] = 0 };
425         obd_valid valid = OBD_MD_FLGETATTR | OBD_MD_FLEASIZE | OBD_MD_FLACL |
426                           OBD_MD_FLMODEASIZE | OBD_MD_FLDIREA;
427         int bufcount = 5;
428         ENTRY;
429
430         if (mdc_exp_is_2_0_server(exp)) {
431                 size[DLM_INTENT_REC_OFF+1] = 0; /* capa */
432                 size[DLM_INTENT_REC_OFF+2] = data->namelen + 1;
433                 bufcount = 6;
434         }
435         req = ldlm_prep_enqueue_req(exp, bufcount, size, NULL, 0);
436         if (req) {
437                 /* pack the intent */
438                 lit = lustre_msg_buf(req->rq_reqmsg, DLM_INTENT_IT_OFF,
439                                      sizeof(*lit));
440                 lit->opc = (__u64)it->it_op;
441
442                 /* pack the intended request */
443                 mdc_getattr_pack(req, DLM_INTENT_REC_OFF, valid, it->it_flags,
444                                  data, obddev->u.cli.cl_max_mds_easize);
445                 ptlrpc_req_set_repsize(req, bufcount, repsize);
446         }
447         RETURN(req);
448 }
449
450 static struct ptlrpc_request *mdc_intent_readdir_pack(struct obd_export *exp)
451 {
452         struct ptlrpc_request *req;
453         __u32 size[2] = { [MSG_PTLRPC_BODY_OFF] = sizeof(struct ptlrpc_body),
454                         [DLM_LOCKREQ_OFF]     = sizeof(struct ldlm_request) };
455         __u32 repsize[3] = { [MSG_PTLRPC_BODY_OFF] = sizeof(struct ptlrpc_body),
456                            [DLM_LOCKREPLY_OFF]   = sizeof(struct ldlm_reply),
457                            [DLM_REPLY_REC_OFF] = sizeof(struct ost_lvb) };
458         ENTRY;
459
460         req = ldlm_prep_enqueue_req(exp, 2, size, NULL, 0);
461         if (req)
462                 ptlrpc_req_set_repsize(req, 3, repsize);
463         RETURN(req);
464 }
465
466 static int mdc_finish_enqueue(struct obd_export *exp,
467                               struct ptlrpc_request *req,
468                               struct ldlm_enqueue_info *einfo,
469                               struct lookup_intent *it,
470                               struct lustre_handle *lockh,
471                               int rc)
472 {
473         struct ldlm_request *lockreq;
474         struct ldlm_reply *lockrep;
475         ENTRY;
476
477         LASSERT(rc >= 0);
478         /* Similarly, if we're going to replay this request, we don't want to
479          * actually get a lock, just perform the intent. */
480         if (req->rq_transno || req->rq_replay) {
481                 lockreq = lustre_msg_buf(req->rq_reqmsg, DLM_LOCKREQ_OFF,
482                                          sizeof(*lockreq));
483                 lockreq->lock_flags |= LDLM_FL_INTENT_ONLY;
484         }
485
486         if (rc == ELDLM_LOCK_ABORTED) {
487                 einfo->ei_mode = 0;
488                 memset(lockh, 0, sizeof(*lockh));
489                 rc = 0;
490         } else { /* rc = 0 */
491                 struct ldlm_lock *lock = ldlm_handle2lock(lockh);
492                 LASSERT(lock);
493
494                 /* If the server gave us back a different lock mode, we should
495                  * fix up our variables. */
496                 if (lock->l_req_mode != einfo->ei_mode) {
497                         ldlm_lock_addref(lockh, lock->l_req_mode);
498                         ldlm_lock_decref(lockh, einfo->ei_mode);
499                         einfo->ei_mode = lock->l_req_mode;
500                 }
501                 LDLM_LOCK_PUT(lock);
502         }
503
504         lockrep = lustre_msg_buf(req->rq_repmsg, DLM_LOCKREPLY_OFF,
505                                  sizeof(*lockrep));
506         LASSERT(lockrep != NULL);  /* checked by ldlm_cli_enqueue() */
507         /* swabbed by ldlm_cli_enqueue() */
508         LASSERT(lustre_rep_swabbed(req, DLM_LOCKREPLY_OFF));
509
510         it->d.lustre.it_disposition = (int)lockrep->lock_policy_res1;
511         it->d.lustre.it_status = (int)lockrep->lock_policy_res2;
512         it->d.lustre.it_lock_mode = einfo->ei_mode;
513         it->d.lustre.it_lock_handle = lockh->cookie;
514         it->d.lustre.it_data = req;
515
516         if (it->d.lustre.it_status < 0 && req->rq_replay)
517                 mdc_clear_replay_flag(req, it->d.lustre.it_status);
518
519         /* If we're doing an IT_OPEN which did not result in an actual
520          * successful open, then we need to remove the bit which saves
521          * this request for unconditional replay.
522          *
523          * It's important that we do this first!  Otherwise we might exit the
524          * function without doing so, and try to replay a failed create
525          * (bug 3440) */
526         if ((it->it_op & IT_OPEN) &&
527             req->rq_replay &&
528             (!it_disposition(it, DISP_OPEN_OPEN) ||
529              it->d.lustre.it_status != 0))
530                 mdc_clear_replay_flag(req, it->d.lustre.it_status);
531
532         DEBUG_REQ(D_RPCTRACE, req, "op: %d disposition: %x, status: %d",
533                   it->it_op,it->d.lustre.it_disposition,it->d.lustre.it_status);
534
535         /* We know what to expect, so we do any byte flipping required here */
536         if (it->it_op & (IT_OPEN | IT_UNLINK | IT_LOOKUP | IT_GETATTR)) {
537                 struct mds_body *body;
538
539                 body = lustre_swab_repbuf(req, DLM_REPLY_REC_OFF, sizeof(*body),
540                                          lustre_swab_mds_body);
541                 if (body == NULL) {
542                         CERROR ("Can't swab mds_body\n");
543                         RETURN (-EPROTO);
544                 }
545
546                 /* If this is a successful OPEN request, we need to set
547                    replay handler and data early, so that if replay happens
548                    immediately after swabbing below, new reply is swabbed
549                    by that handler correctly */
550                 if (it_disposition(it, DISP_OPEN_OPEN) &&
551                     !it_open_error(DISP_OPEN_OPEN, it))
552                         mdc_set_open_replay_data(NULL, req);
553
554                 if ((body->valid & OBD_MD_FLEASIZE) != 0) {
555                         void *eadata;
556
557                         mdc_update_max_ea_from_body(exp, body);
558
559                         /* The eadata is opaque; just check that it is there.
560                          * Eventually, obd_unpackmd() will check the contents */
561                         eadata = lustre_swab_repbuf(req, DLM_REPLY_REC_OFF + 1,
562                                                     body->eadatasize, NULL);
563                         if (eadata == NULL) {
564                                 CERROR ("Missing/short eadata\n");
565                                 RETURN (-EPROTO);
566                         }
567                         /* We save the reply LOV EA in case we have to replay
568                          * a create for recovery.  If we didn't allocate a
569                          * large enough request buffer above we need to
570                          * reallocate it here to hold the actual LOV EA. */
571                         if (it->it_op & IT_OPEN) {
572                                 int offset = DLM_INTENT_REC_OFF;
573                                 void *lmm;
574
575                                 if (mdc_req_is_2_0_server(req))
576                                         offset += 4;
577                                 else
578                                         offset += 2;
579
580                                 if (lustre_msg_buflen(req->rq_reqmsg, offset) !=
581                                     body->eadatasize)
582                                         mdc_realloc_openmsg(req, body);
583
584                                 lmm = lustre_msg_buf(req->rq_reqmsg, offset,
585                                                      body->eadatasize);
586                                 if (lmm)
587                                         memcpy(lmm, eadata, body->eadatasize);
588                         }
589                 }
590         }
591
592         RETURN(rc);
593 }
594
595 /* We always reserve enough space in the reply packet for a stripe MD, because
596  * we don't know in advance the file type. */
597 int mdc_enqueue(struct obd_export *exp, struct ldlm_enqueue_info *einfo,
598                 struct lookup_intent *it, struct mdc_op_data *data,
599                 struct lustre_handle *lockh, void *lmm, int lmmsize,
600                 int extra_lock_flags)
601 {
602         struct ptlrpc_request *req;
603         struct obd_device *obddev = class_exp2obd(exp);
604         struct ldlm_res_id res_id;
605         ldlm_policy_data_t policy = { .l_inodebits = { MDS_INODELOCK_LOOKUP } };
606         int flags = extra_lock_flags | LDLM_FL_HAS_INTENT;
607         int rc;
608         ENTRY;
609
610         fid_build_reg_res_name((void *)&data->fid1, &res_id);
611         LASSERTF(einfo->ei_type == LDLM_IBITS,"lock type %d\n", einfo->ei_type);
612         if (it->it_op & (IT_UNLINK | IT_GETATTR | IT_READDIR))
613                 policy.l_inodebits.bits = MDS_INODELOCK_UPDATE;
614
615         if (it->it_op & IT_OPEN) {
616                 if ((it->it_op & IT_CREAT) && mdc_exp_is_2_0_server(exp)) {
617                         struct client_obd *cli = &obddev->u.cli;
618                         data->fid3 = data->fid2;
619                         rc = mdc_fid_alloc(cli->cl_seq, (void *)&data->fid2);
620                         if (rc) {
621                                 CERROR("fid allocation result: %d\n", rc);
622                                 RETURN(rc);
623                         }
624                 }
625                 req = mdc_intent_open_pack(exp, it, data, lmm, lmmsize);
626                 if (it->it_create_mode & M_JOIN_FILE) {
627                         policy.l_inodebits.bits = MDS_INODELOCK_UPDATE;
628                 }
629         } else if (it->it_op & IT_UNLINK) {
630                 req = mdc_intent_unlink_pack(exp, it, data);
631         } else if (it->it_op & (IT_GETATTR | IT_LOOKUP)) {
632                 req = mdc_intent_lookup_pack(exp, it, data);
633         } else if (it->it_op == IT_READDIR) {
634                 req = mdc_intent_readdir_pack(exp);
635         } else {
636                 CERROR("bad it_op %x\n", it->it_op);
637                 RETURN(-EINVAL);
638         }
639
640         if (!req)
641                 RETURN(-ENOMEM);
642
643          /* It is important to obtain rpc_lock first (if applicable), so that
644           * threads that are serialised with rpc_lock are not polluting our
645           * rpcs in flight counter */
646         mdc_get_rpc_lock(obddev->u.cli.cl_rpc_lock, it);
647         rc = mdc_enter_request(&obddev->u.cli);
648         if (rc == 0) {
649                 rc = ldlm_cli_enqueue(exp, &req, einfo, res_id, &policy, &flags,
650                                       NULL, 0, NULL, lockh, 0);
651                 mdc_exit_request(&obddev->u.cli);
652                 if (rc < 0)
653                         CERROR("ldlm_cli_enqueue error: %d\n", rc);
654         }
655         mdc_put_rpc_lock(obddev->u.cli.cl_rpc_lock, it);
656         if (rc < 0) {
657                 mdc_clear_replay_flag(req, rc);
658                 ptlrpc_req_finished(req);
659                 RETURN(rc);
660         }
661         rc = mdc_finish_enqueue(exp, req, einfo, it, lockh, rc);
662
663         RETURN(rc);
664 }
665 EXPORT_SYMBOL(mdc_enqueue);
666
667 int mdc_revalidate_lock(struct obd_export *exp, struct lookup_intent *it,
668                         struct ll_fid *fid)
669 {
670                 /* We could just return 1 immediately, but since we should only
671                  * be called in revalidate_it if we already have a lock, let's
672                  * verify that. */
673         struct ldlm_res_id res_id;
674         struct lustre_handle lockh;
675         ldlm_policy_data_t policy;
676         ldlm_mode_t mode;
677         ENTRY;
678
679         fid_build_reg_res_name((struct lu_fid*)fid, &res_id);
680         /* As not all attributes are kept under update lock, e.g. 
681            owner/group/acls are under lookup lock, we need both 
682            ibits for GETATTR. */
683         policy.l_inodebits.bits = (it->it_op == IT_GETATTR) ?
684                 MDS_INODELOCK_UPDATE | MDS_INODELOCK_LOOKUP :
685                 MDS_INODELOCK_LOOKUP;
686
687         mode = ldlm_lock_match(exp->exp_obd->obd_namespace,
688                                LDLM_FL_BLOCK_GRANTED, &res_id, LDLM_IBITS,
689                                &policy, LCK_CR|LCK_CW|LCK_PR|LCK_PW, &lockh);
690         if (mode) {
691                 memcpy(&it->d.lustre.it_lock_handle, &lockh, sizeof(lockh));
692                 it->d.lustre.it_lock_mode = mode;
693         }
694
695         RETURN(!!mode);
696 }
697 EXPORT_SYMBOL(mdc_revalidate_lock);
698
699 static int mdc_finish_intent_lock(struct obd_export *exp,
700                                   struct ptlrpc_request *req,
701                                   struct mdc_op_data *data,
702                                   struct lookup_intent *it,
703                                   struct lustre_handle *lockh)
704 {
705         struct mds_body *mds_body;
706         struct lustre_handle old_lock;
707         struct ldlm_lock *lock;
708         int rc;
709         ENTRY;
710
711         LASSERT(req != NULL);
712         LASSERT(req != LP_POISON);
713         LASSERT(req->rq_repmsg != LP_POISON);
714
715         if (!it_disposition(it, DISP_IT_EXECD)) {
716                 /* The server failed before it even started executing the
717                  * intent, i.e. because it couldn't unpack the request. */
718                 LASSERT(it->d.lustre.it_status != 0);
719                 RETURN(it->d.lustre.it_status);
720         }
721         rc = it_open_error(DISP_IT_EXECD, it);
722         if (rc)
723                 RETURN(rc);
724
725         mds_body = lustre_msg_buf(req->rq_repmsg, DLM_REPLY_REC_OFF,
726                                   sizeof(*mds_body));
727         /* mdc_enqueue checked */
728         LASSERT(mds_body != NULL);
729         /* mdc_enqueue swabbed */
730         LASSERT(lustre_rep_swabbed(req, DLM_REPLY_REC_OFF));
731
732         /* If we were revalidating a fid/name pair, mark the intent in
733          * case we fail and get called again from lookup */
734
735         if (data->fid2.id && (it->it_op != IT_GETATTR) &&
736            ( !mdc_exp_is_2_0_server(exp) ||
737              (mdc_exp_is_2_0_server(exp) && (it->it_create_mode & M_CHECK_STALE)))) {
738                 it_set_disposition(it, DISP_ENQ_COMPLETE);
739
740                 /* Also: did we find the same inode? */
741                 if (memcmp(&data->fid2, &mds_body->fid1, sizeof(data->fid2)) &&
742                     memcmp(&data->fid3, &mds_body->fid1, sizeof(data->fid3)))
743                         RETURN(-ESTALE);
744         }
745
746         rc = it_open_error(DISP_LOOKUP_EXECD, it);
747         if (rc)
748                 RETURN(rc);
749
750         /* keep requests around for the multiple phases of the call
751          * this shows the DISP_XX must guarantee we make it into the call
752          */
753         if (!it_disposition(it, DISP_ENQ_CREATE_REF) &&
754             it_disposition(it, DISP_OPEN_CREATE) &&
755             !it_open_error(DISP_OPEN_CREATE, it)) {
756                 it_set_disposition(it, DISP_ENQ_CREATE_REF);
757                 ptlrpc_request_addref(req); /* balanced in ll_create_node */
758         }
759         if (!it_disposition(it, DISP_ENQ_OPEN_REF) &&
760             it_disposition(it, DISP_OPEN_OPEN) &&
761             !it_open_error(DISP_OPEN_OPEN, it)) {
762                 it_set_disposition(it, DISP_ENQ_OPEN_REF);
763                 ptlrpc_request_addref(req); /* balanced in ll_file_open */
764                 /* BUG 11546 - eviction in the middle of open rpc processing */
765                 OBD_FAIL_TIMEOUT(OBD_FAIL_MDC_ENQUEUE_PAUSE, obd_timeout);
766         }
767
768         if (it->it_op & IT_CREAT) {
769                 /* XXX this belongs in ll_create_it */
770         } else if (it->it_op == IT_OPEN) {
771                 LASSERT(!it_disposition(it, DISP_OPEN_CREATE));
772         } else {
773                 LASSERT(it->it_op & (IT_GETATTR | IT_LOOKUP));
774         }
775
776         /* If we already have a matching lock, then cancel the new
777          * one.  We have to set the data here instead of in
778          * mdc_enqueue, because we need to use the child's inode as
779          * the l_ast_data to match, and that's not available until
780          * intent_finish has performed the iget().) */
781         lock = ldlm_handle2lock(lockh);
782         if (lock) {
783                 ldlm_policy_data_t policy = lock->l_policy_data;
784
785                 LDLM_DEBUG(lock, "matching against this");
786                 LDLM_LOCK_PUT(lock);
787                 memcpy(&old_lock, lockh, sizeof(*lockh));
788                 if (ldlm_lock_match(NULL, LDLM_FL_BLOCK_GRANTED, NULL,
789                                     LDLM_IBITS, &policy, LCK_NL, &old_lock)) {
790                         ldlm_lock_decref_and_cancel(lockh,
791                                                     it->d.lustre.it_lock_mode);
792                         memcpy(lockh, &old_lock, sizeof(old_lock));
793                         memcpy(&it->d.lustre.it_lock_handle, lockh,
794                                sizeof(*lockh));
795                 }
796         }
797
798         CDEBUG(D_DENTRY,"D_IT dentry %.*s intent: %s status %d disp %x rc %d\n",
799                data->namelen, data->name, ldlm_it2str(it->it_op),
800                it->d.lustre.it_status, it->d.lustre.it_disposition, rc);
801         RETURN(rc);
802 }
803
804 /* 
805  * This long block is all about fixing up the lock and request state
806  * so that it is correct as of the moment _before_ the operation was
807  * applied; that way, the VFS will think that everything is normal and
808  * call Lustre's regular VFS methods.
809  *
810  * If we're performing a creation, that means that unless the creation
811  * failed with EEXIST, we should fake up a negative dentry.
812  *
813  * For everything else, we want to lookup to succeed.
814  *
815  * One additional note: if CREATE or OPEN succeeded, we add an extra
816  * reference to the request because we need to keep it around until
817  * ll_create/ll_open gets called.
818  *
819  * The server will return to us, in it_disposition, an indication of
820  * exactly what d.lustre.it_status refers to.
821  *
822  * If DISP_OPEN_OPEN is set, then d.lustre.it_status refers to the open() call,
823  * otherwise if DISP_OPEN_CREATE is set, then it status is the
824  * creation failure mode.  In either case, one of DISP_LOOKUP_NEG or
825  * DISP_LOOKUP_POS will be set, indicating whether the child lookup
826  * was successful.
827  *
828  * Else, if DISP_LOOKUP_EXECD then d.lustre.it_status is the rc of the
829  * child lookup.
830  */
831 int mdc_intent_lock(struct obd_export *exp, struct mdc_op_data *op_data,
832                     void *lmm, int lmmsize, struct lookup_intent *it,
833                     int lookup_flags, struct ptlrpc_request **reqp,
834                     ldlm_blocking_callback cb_blocking, int extra_lock_flags)
835 {
836         struct lustre_handle lockh;
837         int rc;
838         ENTRY;
839
840         LASSERT(it);
841
842         CDEBUG(D_DLMTRACE,"name: %.*s("DFID") in inode ("DFID"), "
843                "intent: %s flags %#o\n",
844                op_data->namelen, op_data->name,
845                PFID(((void *)&op_data->fid2)),
846                PFID(((void *)&op_data->fid1)),
847                ldlm_it2str(it->it_op), it->it_flags);
848
849         lockh.cookie = 0;
850         if (op_data->fid2.id &&
851             (it->it_op == IT_LOOKUP || it->it_op == IT_GETATTR)) {
852                 rc = mdc_revalidate_lock(exp, it, &op_data->fid2);
853                 /* Only return failure if it was not GETATTR by cfid
854                    (from inode_revalidate) */
855                 if (rc || op_data->namelen != 0)
856                         RETURN(rc);
857         }
858
859         /* lookup_it may be called only after revalidate_it has run, because
860          * revalidate_it cannot return errors, only zero.  Returning zero causes
861          * this call to lookup, which *can* return an error.
862          *
863          * We only want to execute the request associated with the intent one
864          * time, however, so don't send the request again.  Instead, skip past
865          * this and use the request from revalidate.  In this case, revalidate
866          * never dropped its reference, so the refcounts are all OK */
867         if (!it_disposition(it, DISP_ENQ_COMPLETE)) {
868                 struct ldlm_enqueue_info einfo =
869                         { LDLM_IBITS, it_to_lock_mode(it), cb_blocking,
870                           ldlm_completion_ast, NULL, NULL };
871
872                 rc = mdc_enqueue(exp, &einfo, it, op_data, &lockh,
873                                  lmm, lmmsize, extra_lock_flags);
874                 if (rc < 0)
875                         RETURN(rc);
876         } else if (!op_data->fid2.id) {
877                 /* DISP_ENQ_COMPLETE set means there is extra reference on
878                  * request referenced from this intent, saved for subsequent
879                  * lookup.  This path is executed when we proceed to this
880                  * lookup, so we clear DISP_ENQ_COMPLETE */
881                 it_clear_disposition(it, DISP_ENQ_COMPLETE);
882         }
883
884         *reqp = it->d.lustre.it_data;
885         rc = mdc_finish_intent_lock(exp, *reqp, op_data, it, &lockh);
886
887         RETURN(rc);
888 }
889 EXPORT_SYMBOL(mdc_intent_lock);
890
891 static int mdc_intent_getattr_async_interpret(struct ptlrpc_request *req,
892                                               void *args, int rc)
893 {
894         struct mdc_getattr_args  *ga = args;
895         struct obd_export        *exp = ga->ga_exp;
896         struct md_enqueue_info   *minfo = ga->ga_minfo;
897         struct ldlm_enqueue_info *einfo = ga->ga_einfo;
898         struct lookup_intent     *it;
899         struct lustre_handle     *lockh;
900         struct obd_device        *obddev;
901         int                       flags = LDLM_FL_HAS_INTENT;
902         ENTRY;
903
904         it    = &minfo->mi_it;
905         lockh = &minfo->mi_lockh;
906
907         obddev = class_exp2obd(exp);
908
909         mdc_exit_request(&obddev->u.cli);
910         if (OBD_FAIL_CHECK(OBD_FAIL_MDC_GETATTR_ENQUEUE))
911                 rc = -ETIMEDOUT;
912
913         rc = ldlm_cli_enqueue_fini(exp, req, einfo->ei_type, 1, einfo->ei_mode,
914                                    &flags, NULL, 0, NULL, lockh, rc);
915         if (rc < 0) {
916                 CERROR("ldlm_cli_enqueue_fini: %d\n", rc);
917                 mdc_clear_replay_flag(req, rc);
918                 GOTO(out, rc);
919         }
920
921         rc = mdc_finish_enqueue(exp, req, einfo, it, lockh, rc);
922         if (rc)
923                 GOTO(out, rc);
924
925         rc = mdc_finish_intent_lock(exp, req, &minfo->mi_data, it, lockh);
926         GOTO(out, rc);
927 out:
928         OBD_FREE_PTR(einfo);
929         minfo->mi_cb(exp, req, minfo, rc);
930
931         return 0;
932 }
933
934 int mdc_intent_getattr_async(struct obd_export *exp,
935                              struct md_enqueue_info *minfo,
936                              struct ldlm_enqueue_info *einfo)
937 {
938         struct mdc_op_data      *op_data = &minfo->mi_data;
939         struct lookup_intent    *it = &minfo->mi_it;
940         struct ptlrpc_request   *req;
941         struct mdc_getattr_args *ga;
942         struct obd_device       *obddev = class_exp2obd(exp);
943         struct ldlm_res_id res_id;
944         ldlm_policy_data_t       policy = {
945                                         .l_inodebits = { MDS_INODELOCK_LOOKUP }
946                                  };
947         int                      rc;
948         int                      flags = LDLM_FL_HAS_INTENT;
949         ENTRY;
950
951         CDEBUG(D_DLMTRACE,"name: %.*s in inode "LPU64", intent: %s flags %#o\n",
952                op_data->namelen, op_data->name, op_data->fid1.id,
953                ldlm_it2str(it->it_op), it->it_flags);
954
955         fid_build_reg_res_name((void *)&op_data->fid1, &res_id);
956         req = mdc_intent_lookup_pack(exp, it, op_data);
957         if (!req)
958                 RETURN(-ENOMEM);
959
960         rc = mdc_enter_request(&obddev->u.cli);
961         if (rc) {
962                 ptlrpc_req_finished(req);
963                 RETURN(rc);
964         }
965         rc = ldlm_cli_enqueue(exp, &req, einfo, res_id, &policy, &flags, NULL,
966                               0, NULL, &minfo->mi_lockh, 1);
967         if (rc < 0) {
968                 mdc_exit_request(&obddev->u.cli);
969                 ptlrpc_req_finished(req);
970                 RETURN(rc);
971         }
972
973         CLASSERT(sizeof(*ga) <= sizeof(req->rq_async_args));
974         ga = ptlrpc_req_async_args(req);
975         ga->ga_exp = exp;
976         ga->ga_minfo = minfo;
977         ga->ga_einfo = einfo;
978
979         req->rq_interpret_reply = mdc_intent_getattr_async_interpret;
980         ptlrpcd_add_req(req);
981
982         RETURN(0);
983 }
984 EXPORT_SYMBOL(mdc_intent_getattr_async);