Whamcloud - gitweb
b=20433 decrease the usage of memory on clients.
[fs/lustre-release.git] / lustre / mdc / mdc_locks.c
1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2  * vim:expandtab:shiftwidth=8:tabstop=8:
3  *
4  * GPL HEADER START
5  *
6  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
7  *
8  * This program is free software; you can redistribute it and/or modify
9  * it under the terms of the GNU General Public License version 2 only,
10  * as published by the Free Software Foundation.
11  *
12  * This program is distributed in the hope that it will be useful, but
13  * WITHOUT ANY WARRANTY; without even the implied warranty of
14  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
15  * General Public License version 2 for more details (a copy is included
16  * in the LICENSE file that accompanied this code).
17  *
18  * You should have received a copy of the GNU General Public License
19  * version 2 along with this program; If not, see
20  * http://www.sun.com/software/products/lustre/docs/GPLv2.pdf
21  *
22  * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara,
23  * CA 95054 USA or visit www.sun.com if you need additional information or
24  * have any questions.
25  *
26  * GPL HEADER END
27  */
28 /*
29  * Copyright  2008 Sun Microsystems, Inc. All rights reserved
30  * Use is subject to license terms.
31  */
32 /*
33  * This file is part of Lustre, http://www.lustre.org/
34  * Lustre is a trademark of Sun Microsystems, Inc.
35  */
36
37 #ifndef EXPORT_SYMTAB
38 # define EXPORT_SYMTAB
39 #endif
40 #define DEBUG_SUBSYSTEM S_MDC
41
42 #ifdef __KERNEL__
43 # include <linux/module.h>
44 # include <linux/pagemap.h>
45 # include <linux/miscdevice.h>
46 # include <linux/init.h>
47 #else
48 # include <liblustre.h>
49 #endif
50
51 #include <obd_class.h>
52 #include <lustre_dlm.h>
53 #include <lprocfs_status.h>
54 #include "mdc_internal.h"
55
56 int it_open_error(int phase, struct lookup_intent *it)
57 {
58         if (it_disposition(it, DISP_OPEN_OPEN)) {
59                 if (phase >= DISP_OPEN_OPEN)
60                         return it->d.lustre.it_status;
61                 else
62                         return 0;
63         }
64
65         if (it_disposition(it, DISP_OPEN_CREATE)) {
66                 if (phase >= DISP_OPEN_CREATE)
67                         return it->d.lustre.it_status;
68                 else
69                         return 0;
70         }
71
72         if (it_disposition(it, DISP_LOOKUP_EXECD)) {
73                 if (phase >= DISP_LOOKUP_EXECD)
74                         return it->d.lustre.it_status;
75                 else
76                         return 0;
77         }
78
79         if (it_disposition(it, DISP_IT_EXECD)) {
80                 if (phase >= DISP_IT_EXECD)
81                         return it->d.lustre.it_status;
82                 else
83                         return 0;
84         }
85         CERROR("it disp: %X, status: %d\n", it->d.lustre.it_disposition,
86                it->d.lustre.it_status);
87         LBUG();
88         return 0;
89 }
90 EXPORT_SYMBOL(it_open_error);
91
92 /* this must be called on a lockh that is known to have a referenced lock */
93 void mdc_set_lock_data(__u64 *l, void *data, __u32 *bits)
94 {
95         struct ldlm_lock *lock;
96         struct lustre_handle *lockh = (struct lustre_handle *)l;
97         ENTRY;
98
99         if(bits)
100                 *bits = 0;
101
102         if (!*l) {
103                 EXIT;
104                 return;
105         }
106
107         lock = ldlm_handle2lock(lockh);
108
109         LASSERT(lock != NULL);
110         lock_res_and_lock(lock);
111 #ifdef __KERNEL__
112         if (lock->l_ast_data && lock->l_ast_data != data) {
113                 struct inode *new_inode = data;
114                 struct inode *old_inode = lock->l_ast_data;
115                 LASSERTF(old_inode->i_state & I_FREEING,
116                          "Found existing inode %p/%lu/%u state %lu in lock: "
117                          "setting data to %p/%lu/%u\n", old_inode,
118                          old_inode->i_ino, old_inode->i_generation,
119                          old_inode->i_state,
120                          new_inode, new_inode->i_ino, new_inode->i_generation);
121         }
122 #endif
123         lock->l_ast_data = data;
124         if (bits)
125                 *bits = lock->l_policy_data.l_inodebits.bits;
126         unlock_res_and_lock(lock);
127         LDLM_LOCK_PUT(lock);
128
129         EXIT;
130 }
131 EXPORT_SYMBOL(mdc_set_lock_data);
132
133 int mdc_change_cbdata(struct obd_export *exp, struct ll_fid *fid, 
134                       ldlm_iterator_t it, void *data)
135 {
136         struct ldlm_res_id res_id;
137         ENTRY;
138
139         fid_build_reg_res_name((struct lu_fid*)fid, &res_id);
140         ldlm_resource_iterate(class_exp2obd(exp)->obd_namespace, &res_id,
141                               it, data);
142
143         EXIT;
144         return 0;
145 }
146
147 /* find any ldlm lock of the inode in mdc
148  * return 0    not find
149  *        1    find one
150  *      < 0    error */
151 int mdc_find_cbdata(struct obd_export *exp, struct ll_fid *fid,
152                     ldlm_iterator_t it, void *data)
153 {
154         struct ldlm_res_id res_id;
155         int rc = 0;
156         ENTRY;
157
158         fid_build_reg_res_name((struct lu_fid*)fid, &res_id);
159         rc = ldlm_resource_iterate(class_exp2obd(exp)->obd_namespace, &res_id,
160                                    it, data);
161         if (rc == LDLM_ITER_STOP)
162                 RETURN(1);
163         else if (rc == LDLM_ITER_CONTINUE)
164                 RETURN(0);
165         RETURN(rc);
166 }
167
168 static inline void mdc_clear_replay_flag(struct ptlrpc_request *req, int rc)
169 {
170         /* Don't hold error requests for replay. */
171         if (req->rq_replay) {
172                 spin_lock(&req->rq_lock);
173                 req->rq_replay = 0;
174                 spin_unlock(&req->rq_lock);
175         }
176         if (rc && req->rq_transno != 0) {
177                 DEBUG_REQ(D_ERROR, req, "transno returned on error rc %d", rc);
178                 LBUG();
179         }
180 }
181
182 static int round_up(int val)
183 {
184         int ret = 1;
185         while (val) {
186                 val >>= 1;
187                 ret <<= 1;
188         }
189         return ret;
190 }
191
192 /* Save a large LOV EA into the request buffer so that it is available
193  * for replay.  We don't do this in the initial request because the
194  * original request doesn't need this buffer (at most it sends just the
195  * lov_mds_md) and it is a waste of RAM/bandwidth to send the empty
196  * buffer and may also be difficult to allocate and save a very large
197  * request buffer for each open. (bug 5707)
198  *
199  * OOM here may cause recovery failure if lmm is needed (only for the
200  * original open if the MDS crashed just when this client also OOM'd)
201  * but this is incredibly unlikely, and questionable whether the client
202  * could do MDS recovery under OOM anyways... */
203 static void mdc_realloc_openmsg(struct ptlrpc_request *req,
204                                 struct mds_body *body)
205 {
206         int old_len, new_size, old_size;
207         struct lustre_msg *old_msg = req->rq_reqmsg;
208         struct lustre_msg *new_msg;
209         int offset;
210
211         if (mdc_req_is_2_0_server(req))
212                 offset = 4;
213         else
214                 offset = 2;
215
216         old_len = lustre_msg_buflen(old_msg, DLM_INTENT_REC_OFF + offset);
217         old_size = lustre_packed_msg_size(old_msg);
218         lustre_msg_set_buflen(old_msg, DLM_INTENT_REC_OFF + offset,
219                               body->eadatasize);
220         /* old buffer is more then need */
221         if (old_len > body->eadatasize)
222                 return;
223
224         new_size = lustre_packed_msg_size(old_msg);
225
226         OBD_ALLOC(new_msg, new_size);
227         if (new_msg != NULL) {
228                 DEBUG_REQ(D_INFO, req, "replace reqmsg for larger EA %u",
229                           body->eadatasize);
230                 memcpy(new_msg, old_msg, old_size);
231
232                 spin_lock(&req->rq_lock);
233                 req->rq_reqmsg = new_msg;
234                 req->rq_reqlen = new_size;
235                 spin_unlock(&req->rq_lock);
236
237                 OBD_FREE(old_msg, old_size);
238         } else {
239                 lustre_msg_set_buflen(old_msg,
240                                       DLM_INTENT_REC_OFF + offset, old_len);
241                 body->valid &= ~OBD_MD_FLEASIZE;
242                 body->eadatasize = 0;
243         }
244 }
245
246 static struct ptlrpc_request *mdc_intent_open_pack(struct obd_export *exp,
247                                                    struct lookup_intent *it,
248                                                    struct mdc_op_data *data,
249                                                    void *lmm, __u32 lmmsize)
250 {
251         struct ptlrpc_request *req;
252         struct ldlm_intent *lit;
253         struct obd_device *obddev = class_exp2obd(exp);
254         __u32 size[9] = { [MSG_PTLRPC_BODY_OFF] = sizeof(struct ptlrpc_body),
255                         [DLM_LOCKREQ_OFF]     = sizeof(struct ldlm_request),
256                         [DLM_INTENT_IT_OFF]   = sizeof(*lit),
257                         [DLM_INTENT_REC_OFF]  = sizeof(struct mds_rec_create),
258                         [DLM_INTENT_REC_OFF+1]= data->namelen + 1,
259                         /* As an optimization, we allocate an RPC request buffer
260                          * for at least a default-sized LOV EA even if we aren't
261                          * sending one.  We grow the whole request to the next
262                          * power-of-two size since we get that much from a slab
263                          * allocation anyways. This avoids an allocation below
264                          * in the common case where we need to save a
265                          * default-sized LOV EA for open replay. */
266                         [DLM_INTENT_REC_OFF+2]= max(lmmsize,
267                                          obddev->u.cli.cl_default_mds_easize) };
268         __u32 repsize[7] = { [MSG_PTLRPC_BODY_OFF] = sizeof(struct ptlrpc_body),
269                            [DLM_LOCKREPLY_OFF]   = sizeof(struct ldlm_reply),
270                            [DLM_REPLY_REC_OFF]   = sizeof(struct mdt_body),
271                            [DLM_REPLY_REC_OFF+1] = obddev->u.cli.
272                                                         cl_max_mds_easize,
273                            [DLM_REPLY_REC_OFF+2] = LUSTRE_POSIX_ACL_MAX_SIZE };
274         CFS_LIST_HEAD(cancels);
275         int do_join = (it->it_create_mode & M_JOIN_FILE) && data->data;
276         int count = 0;
277         int bufcount = 6;
278         int repbufcount = 5;
279         int mode;
280         int rc;
281         ENTRY;
282
283         it->it_create_mode = (it->it_create_mode & ~S_IFMT) | S_IFREG;
284         if (mdc_exp_is_2_0_server(exp)) {
285                 size[DLM_INTENT_REC_OFF] = sizeof(struct mdt_rec_create);
286                 size[DLM_INTENT_REC_OFF+4] = size[DLM_INTENT_REC_OFF+2];
287                 size[DLM_INTENT_REC_OFF+3] = size[DLM_INTENT_REC_OFF+1];
288                 size[DLM_INTENT_REC_OFF+2] = 0; /* capa */
289                 size[DLM_INTENT_REC_OFF+1] = 0; /* capa */
290                 bufcount = 8;
291                 repsize[DLM_REPLY_REC_OFF+3]=sizeof(struct lustre_capa);
292                 repsize[DLM_REPLY_REC_OFF+4]=sizeof(struct lustre_capa);
293                 repbufcount = 7;
294         }
295         rc = lustre_msg_size(class_exp2cliimp(exp)->imp_msg_magic,
296                              bufcount, size);
297         if (rc & (rc - 1))
298                 size[bufcount - 1] = min(size[bufcount - 1] + round_up(rc) - rc,
299                                          (__u32)obddev->u.cli.cl_max_mds_easize);
300
301         /* If inode is known, cancel conflicting OPEN locks. */
302         if (data->fid2.id) {
303                 if (it->it_flags & (FMODE_WRITE|MDS_OPEN_TRUNC))
304                         mode = LCK_CW;
305 #ifdef FMODE_EXEC
306                 else if (it->it_flags & FMODE_EXEC)
307                         mode = LCK_PR;
308 #endif
309                 else
310                         mode = LCK_CR;
311                 count = mdc_resource_get_unused(exp, &data->fid2, &cancels,
312                                                 mode, MDS_INODELOCK_OPEN);
313         }
314
315         /* If CREATE or JOIN_FILE, cancel parent's UPDATE lock. */
316         if (it->it_op & IT_CREAT || do_join)
317                 mode = LCK_EX;
318         else
319                 mode = LCK_CR;
320         count += mdc_resource_get_unused(exp, &data->fid1, &cancels, mode,
321                                          MDS_INODELOCK_UPDATE);
322         if (do_join) {
323                 __u64 head_size = (*(__u64 *)data->data);
324                 /* join is like an unlink of the tail */
325                 if (mdc_exp_is_2_0_server(exp)) {
326                         size[DLM_INTENT_REC_OFF+5]=sizeof(struct mdt_rec_join);
327                 } else {
328                         size[DLM_INTENT_REC_OFF+3]=sizeof(struct mds_rec_join);
329                 }
330                 bufcount++;
331
332                 req = ldlm_prep_enqueue_req(exp, bufcount, size,&cancels,count);
333                 if (req)
334                         mdc_join_pack(req, bufcount - 1, data, head_size);
335         } else {
336                 req = ldlm_prep_enqueue_req(exp, bufcount, size,&cancels,count);
337                 it->it_create_mode &= ~M_JOIN_FILE;
338         }
339
340         if (req) {
341                 spin_lock(&req->rq_lock);
342                 req->rq_replay = req->rq_import->imp_replayable;
343                 spin_unlock(&req->rq_lock);
344
345                 /* pack the intent */
346                 lit = lustre_msg_buf(req->rq_reqmsg, DLM_INTENT_IT_OFF,
347                                      sizeof(*lit));
348                 lit->opc = (__u64)it->it_op;
349
350                 /* pack the intended request */
351                 mdc_open_pack(req, DLM_INTENT_REC_OFF, data,
352                               it->it_create_mode, 0, it->it_flags,
353                               lmm, lmmsize);
354
355                 ptlrpc_req_set_repsize(req, repbufcount, repsize);
356         }
357         RETURN(req);
358 }
359
360 static struct ptlrpc_request *mdc_intent_unlink_pack(struct obd_export *exp,
361                                                      struct lookup_intent *it,
362                                                      struct mdc_op_data *data)
363 {
364         struct ptlrpc_request *req;
365         struct ldlm_intent *lit;
366         struct obd_device *obddev = class_exp2obd(exp);
367         __u32 size[5] = { [MSG_PTLRPC_BODY_OFF] = sizeof(struct ptlrpc_body),
368                         [DLM_LOCKREQ_OFF]     = sizeof(struct ldlm_request),
369                         [DLM_INTENT_IT_OFF]   = sizeof(*lit),
370                         [DLM_INTENT_REC_OFF]  = mdc_exp_is_2_0_server(exp) ?
371                                                 sizeof(struct mdt_rec_unlink) :
372                                                 sizeof(struct mds_rec_unlink),
373                         [DLM_INTENT_REC_OFF+1]= data->namelen + 1 };
374         __u32 repsize[5] = { [MSG_PTLRPC_BODY_OFF] = sizeof(struct ptlrpc_body),
375                            [DLM_LOCKREPLY_OFF]   = sizeof(struct ldlm_reply),
376                            [DLM_REPLY_REC_OFF]   = sizeof(struct mdt_body),
377                            [DLM_REPLY_REC_OFF+1] = obddev->u.cli.
378                                                         cl_max_mds_easize,
379                            [DLM_REPLY_REC_OFF+2] = obddev->u.cli.
380                                                         cl_max_mds_cookiesize };
381         ENTRY;
382
383         req = ldlm_prep_enqueue_req(exp, 5, size, NULL, 0);
384         if (req) {
385                 /* pack the intent */
386                 lit = lustre_msg_buf(req->rq_reqmsg, DLM_INTENT_IT_OFF,
387                                      sizeof(*lit));
388                 lit->opc = (__u64)it->it_op;
389
390                 /* pack the intended request */
391                 mdc_unlink_pack(req, DLM_INTENT_REC_OFF, data);
392
393                 ptlrpc_req_set_repsize(req, 5, repsize);
394         }
395         RETURN(req);
396 }
397
398 static struct ptlrpc_request *mdc_intent_lookup_pack(struct obd_export *exp,
399                                                      struct lookup_intent *it,
400                                                      struct mdc_op_data *data)
401 {
402         struct ptlrpc_request *req;
403         struct ldlm_intent *lit;
404         struct obd_device *obddev = class_exp2obd(exp);
405         __u32 size[6] = { [MSG_PTLRPC_BODY_OFF] = sizeof(struct ptlrpc_body),
406                         [DLM_LOCKREQ_OFF]     = sizeof(struct ldlm_request),
407                         [DLM_INTENT_IT_OFF]   = sizeof(*lit),
408                         [DLM_INTENT_REC_OFF]  = sizeof(struct mdt_body),
409                         [DLM_INTENT_REC_OFF+1]= data->namelen + 1,
410                         [DLM_INTENT_REC_OFF+2]= 0 };
411         __u32 repsize[6] = { [MSG_PTLRPC_BODY_OFF] = sizeof(struct ptlrpc_body),
412                            [DLM_LOCKREPLY_OFF]   = sizeof(struct ldlm_reply),
413                            [DLM_REPLY_REC_OFF]   = sizeof(struct mdt_body),
414                            [DLM_REPLY_REC_OFF+1] = obddev->u.cli.
415                                                         cl_max_mds_easize,
416                            [DLM_REPLY_REC_OFF+2] = LUSTRE_POSIX_ACL_MAX_SIZE,
417                            [DLM_REPLY_REC_OFF+3] = 0 };
418         obd_valid valid = OBD_MD_FLGETATTR | OBD_MD_FLEASIZE | OBD_MD_FLACL |
419                           OBD_MD_FLMODEASIZE | OBD_MD_FLDIREA;
420         int bufcount = 5;
421         ENTRY;
422
423         if (mdc_exp_is_2_0_server(exp)) {
424                 size[DLM_INTENT_REC_OFF+1] = 0; /* capa */
425                 size[DLM_INTENT_REC_OFF+2] = data->namelen + 1;
426                 bufcount = 6;
427         }
428         req = ldlm_prep_enqueue_req(exp, bufcount, size, NULL, 0);
429         if (req) {
430                 /* pack the intent */
431                 lit = lustre_msg_buf(req->rq_reqmsg, DLM_INTENT_IT_OFF,
432                                      sizeof(*lit));
433                 lit->opc = (__u64)it->it_op;
434
435                 /* pack the intended request */
436                 mdc_getattr_pack(req, DLM_INTENT_REC_OFF, valid, it->it_flags,
437                                  data);
438                 ptlrpc_req_set_repsize(req, bufcount, repsize);
439         }
440         RETURN(req);
441 }
442
443 static struct ptlrpc_request *mdc_intent_readdir_pack(struct obd_export *exp)
444 {
445         struct ptlrpc_request *req;
446         __u32 size[2] = { [MSG_PTLRPC_BODY_OFF] = sizeof(struct ptlrpc_body),
447                         [DLM_LOCKREQ_OFF]     = sizeof(struct ldlm_request) };
448         __u32 repsize[3] = { [MSG_PTLRPC_BODY_OFF] = sizeof(struct ptlrpc_body),
449                            [DLM_LOCKREPLY_OFF]   = sizeof(struct ldlm_reply),
450                            [DLM_REPLY_REC_OFF] = sizeof(struct ost_lvb) };
451         ENTRY;
452
453         req = ldlm_prep_enqueue_req(exp, 2, size, NULL, 0);
454         if (req)
455                 ptlrpc_req_set_repsize(req, 3, repsize);
456         RETURN(req);
457 }
458
459 static int mdc_finish_enqueue(struct obd_export *exp,
460                               struct ptlrpc_request *req,
461                               struct ldlm_enqueue_info *einfo,
462                               struct lookup_intent *it,
463                               struct lustre_handle *lockh,
464                               int rc)
465 {
466         struct ldlm_request *lockreq;
467         struct ldlm_reply *lockrep;
468         ENTRY;
469
470         LASSERT(rc >= 0);
471         /* Similarly, if we're going to replay this request, we don't want to
472          * actually get a lock, just perform the intent. */
473         if (req->rq_transno || req->rq_replay) {
474                 lockreq = lustre_msg_buf(req->rq_reqmsg, DLM_LOCKREQ_OFF,
475                                          sizeof(*lockreq));
476                 lockreq->lock_flags |= LDLM_FL_INTENT_ONLY;
477         }
478
479         if (rc == ELDLM_LOCK_ABORTED) {
480                 einfo->ei_mode = 0;
481                 memset(lockh, 0, sizeof(*lockh));
482                 rc = 0;
483         } else { /* rc = 0 */
484                 struct ldlm_lock *lock = ldlm_handle2lock(lockh);
485                 LASSERT(lock);
486
487                 /* If the server gave us back a different lock mode, we should
488                  * fix up our variables. */
489                 if (lock->l_req_mode != einfo->ei_mode) {
490                         ldlm_lock_addref(lockh, lock->l_req_mode);
491                         ldlm_lock_decref(lockh, einfo->ei_mode);
492                         einfo->ei_mode = lock->l_req_mode;
493                 }
494                 LDLM_LOCK_PUT(lock);
495         }
496
497         lockrep = lustre_msg_buf(req->rq_repmsg, DLM_LOCKREPLY_OFF,
498                                  sizeof(*lockrep));
499         LASSERT(lockrep != NULL);  /* checked by ldlm_cli_enqueue() */
500         /* swabbed by ldlm_cli_enqueue() */
501         LASSERT(lustre_rep_swabbed(req, DLM_LOCKREPLY_OFF));
502
503         it->d.lustre.it_disposition = (int)lockrep->lock_policy_res1;
504         it->d.lustre.it_status = (int)lockrep->lock_policy_res2;
505         it->d.lustre.it_lock_mode = einfo->ei_mode;
506         it->d.lustre.it_lock_handle = lockh->cookie;
507         it->d.lustre.it_data = req;
508
509         if (it->d.lustre.it_status < 0 && req->rq_replay)
510                 mdc_clear_replay_flag(req, it->d.lustre.it_status);
511
512         /* If we're doing an IT_OPEN which did not result in an actual
513          * successful open, then we need to remove the bit which saves
514          * this request for unconditional replay.
515          *
516          * It's important that we do this first!  Otherwise we might exit the
517          * function without doing so, and try to replay a failed create
518          * (bug 3440) */
519         if ((it->it_op & IT_OPEN) &&
520             req->rq_replay &&
521             (!it_disposition(it, DISP_OPEN_OPEN) ||
522              it->d.lustre.it_status != 0))
523                 mdc_clear_replay_flag(req, it->d.lustre.it_status);
524
525         DEBUG_REQ(D_RPCTRACE, req, "op: %d disposition: %x, status: %d",
526                   it->it_op,it->d.lustre.it_disposition,it->d.lustre.it_status);
527
528         /* We know what to expect, so we do any byte flipping required here */
529         if (it->it_op & (IT_OPEN | IT_UNLINK | IT_LOOKUP | IT_GETATTR)) {
530                 struct mds_body *body;
531
532                 body = lustre_swab_repbuf(req, DLM_REPLY_REC_OFF, sizeof(*body),
533                                          lustre_swab_mds_body);
534                 if (body == NULL) {
535                         CERROR ("Can't swab mds_body\n");
536                         RETURN (-EPROTO);
537                 }
538
539                 /* If this is a successful OPEN request, we need to set
540                    replay handler and data early, so that if replay happens
541                    immediately after swabbing below, new reply is swabbed
542                    by that handler correctly */
543                 if (it_disposition(it, DISP_OPEN_OPEN) &&
544                     !it_open_error(DISP_OPEN_OPEN, it))
545                         mdc_set_open_replay_data(NULL, req);
546
547                 if ((body->valid & OBD_MD_FLEASIZE) != 0) {
548                         void *eadata;
549
550                         mdc_update_max_ea_from_body(exp, body);
551
552                         /* The eadata is opaque; just check that it is there.
553                          * Eventually, obd_unpackmd() will check the contents */
554                         eadata = lustre_swab_repbuf(req, DLM_REPLY_REC_OFF + 1,
555                                                     body->eadatasize, NULL);
556                         if (eadata == NULL) {
557                                 CERROR ("Missing/short eadata\n");
558                                 RETURN (-EPROTO);
559                         }
560                         /* We save the reply LOV EA in case we have to replay
561                          * a create for recovery.  If we didn't allocate a
562                          * large enough request buffer above we need to
563                          * reallocate it here to hold the actual LOV EA. */
564                         if (it->it_op & IT_OPEN) {
565                                 int offset = DLM_INTENT_REC_OFF;
566                                 void *lmm;
567
568                                 if (mdc_req_is_2_0_server(req))
569                                         offset += 4;
570                                 else
571                                         offset += 2;
572
573                                 if (lustre_msg_buflen(req->rq_reqmsg, offset) !=
574                                     body->eadatasize)
575                                         mdc_realloc_openmsg(req, body);
576
577                                 lmm = lustre_msg_buf(req->rq_reqmsg, offset,
578                                                      body->eadatasize);
579                                 if (lmm)
580                                         memcpy(lmm, eadata, body->eadatasize);
581                         }
582                 }
583         }
584
585         RETURN(rc);
586 }
587
588 /* We always reserve enough space in the reply packet for a stripe MD, because
589  * we don't know in advance the file type. */
590 int mdc_enqueue(struct obd_export *exp, struct ldlm_enqueue_info *einfo,
591                 struct lookup_intent *it, struct mdc_op_data *data,
592                 struct lustre_handle *lockh, void *lmm, int lmmsize,
593                 int extra_lock_flags)
594 {
595         struct ptlrpc_request *req;
596         struct obd_device *obddev = class_exp2obd(exp);
597         struct ldlm_res_id res_id;
598         ldlm_policy_data_t policy = { .l_inodebits = { MDS_INODELOCK_LOOKUP } };
599         int flags = extra_lock_flags | LDLM_FL_HAS_INTENT;
600         int rc;
601         ENTRY;
602
603         fid_build_reg_res_name((void *)&data->fid1, &res_id);
604         LASSERTF(einfo->ei_type == LDLM_IBITS,"lock type %d\n", einfo->ei_type);
605         if (it->it_op & (IT_UNLINK | IT_GETATTR | IT_READDIR))
606                 policy.l_inodebits.bits = MDS_INODELOCK_UPDATE;
607
608         if (it->it_op & IT_OPEN) {
609                 if ((it->it_op & IT_CREAT) && mdc_exp_is_2_0_server(exp)) {
610                         struct client_obd *cli = &obddev->u.cli;
611                         data->fid3 = data->fid2;
612                         rc = mdc_fid_alloc(cli->cl_seq, (void *)&data->fid2);
613                         if (rc) {
614                                 CERROR("fid allocation result: %d\n", rc);
615                                 RETURN(rc);
616                         }
617                 }
618                 req = mdc_intent_open_pack(exp, it, data, lmm, lmmsize);
619                 if (it->it_create_mode & M_JOIN_FILE) {
620                         policy.l_inodebits.bits = MDS_INODELOCK_UPDATE;
621                 }
622         } else if (it->it_op & IT_UNLINK) {
623                 req = mdc_intent_unlink_pack(exp, it, data);
624         } else if (it->it_op & (IT_GETATTR | IT_LOOKUP)) {
625                 req = mdc_intent_lookup_pack(exp, it, data);
626         } else if (it->it_op == IT_READDIR) {
627                 req = mdc_intent_readdir_pack(exp);
628         } else {
629                 CERROR("bad it_op %x\n", it->it_op);
630                 RETURN(-EINVAL);
631         }
632
633         if (!req)
634                 RETURN(-ENOMEM);
635
636          /* It is important to obtain rpc_lock first (if applicable), so that
637           * threads that are serialised with rpc_lock are not polluting our
638           * rpcs in flight counter */
639         mdc_get_rpc_lock(obddev->u.cli.cl_rpc_lock, it);
640         mdc_enter_request(&obddev->u.cli);
641         rc = ldlm_cli_enqueue(exp, &req, einfo, res_id, &policy, &flags, NULL,
642                               0, NULL, lockh, 0);
643         mdc_exit_request(&obddev->u.cli);
644         mdc_put_rpc_lock(obddev->u.cli.cl_rpc_lock, it);
645         if (rc < 0) {
646                 CERROR("ldlm_cli_enqueue: %d\n", rc);
647                 mdc_clear_replay_flag(req, rc);
648                 ptlrpc_req_finished(req);
649                 RETURN(rc);
650         }
651         rc = mdc_finish_enqueue(exp, req, einfo, it, lockh, rc);
652
653         RETURN(rc);
654 }
655 EXPORT_SYMBOL(mdc_enqueue);
656
657 int mdc_revalidate_lock(struct obd_export *exp, struct lookup_intent *it,
658                         struct ll_fid *fid)
659 {
660                 /* We could just return 1 immediately, but since we should only
661                  * be called in revalidate_it if we already have a lock, let's
662                  * verify that. */
663         struct ldlm_res_id res_id;
664         struct lustre_handle lockh;
665         ldlm_policy_data_t policy;
666         ldlm_mode_t mode;
667         ENTRY;
668
669         fid_build_reg_res_name((struct lu_fid*)fid, &res_id);
670         /* As not all attributes are kept under update lock, e.g. 
671            owner/group/acls are under lookup lock, we need both 
672            ibits for GETATTR. */
673         policy.l_inodebits.bits = (it->it_op == IT_GETATTR) ?
674                 MDS_INODELOCK_UPDATE | MDS_INODELOCK_LOOKUP :
675                 MDS_INODELOCK_LOOKUP;
676
677         mode = ldlm_lock_match(exp->exp_obd->obd_namespace,
678                                LDLM_FL_BLOCK_GRANTED, &res_id, LDLM_IBITS,
679                                &policy, LCK_CR|LCK_CW|LCK_PR|LCK_PW, &lockh);
680         if (mode) {
681                 memcpy(&it->d.lustre.it_lock_handle, &lockh, sizeof(lockh));
682                 it->d.lustre.it_lock_mode = mode;
683         }
684
685         RETURN(!!mode);
686 }
687 EXPORT_SYMBOL(mdc_revalidate_lock);
688
689 static int mdc_finish_intent_lock(struct obd_export *exp,
690                                   struct ptlrpc_request *req,
691                                   struct mdc_op_data *data,
692                                   struct lookup_intent *it,
693                                   struct lustre_handle *lockh)
694 {
695         struct mds_body *mds_body;
696         struct lustre_handle old_lock;
697         struct ldlm_lock *lock;
698         int rc;
699         ENTRY;
700
701         LASSERT(req != NULL);
702         LASSERT(req != LP_POISON);
703         LASSERT(req->rq_repmsg != LP_POISON);
704
705         if (!it_disposition(it, DISP_IT_EXECD)) {
706                 /* The server failed before it even started executing the
707                  * intent, i.e. because it couldn't unpack the request. */
708                 LASSERT(it->d.lustre.it_status != 0);
709                 RETURN(it->d.lustre.it_status);
710         }
711         rc = it_open_error(DISP_IT_EXECD, it);
712         if (rc)
713                 RETURN(rc);
714
715         mds_body = lustre_msg_buf(req->rq_repmsg, DLM_REPLY_REC_OFF,
716                                   sizeof(*mds_body));
717         /* mdc_enqueue checked */
718         LASSERT(mds_body != NULL);
719         /* mdc_enqueue swabbed */
720         LASSERT(lustre_rep_swabbed(req, DLM_REPLY_REC_OFF));
721
722         /* If we were revalidating a fid/name pair, mark the intent in
723          * case we fail and get called again from lookup */
724
725         if (data->fid2.id && (it->it_op != IT_GETATTR) &&
726            ( !mdc_exp_is_2_0_server(exp) ||
727              (mdc_exp_is_2_0_server(exp) && (it->it_create_mode & M_CHECK_STALE)))) {
728                 it_set_disposition(it, DISP_ENQ_COMPLETE);
729
730                 /* Also: did we find the same inode? */
731                 if (memcmp(&data->fid2, &mds_body->fid1, sizeof(data->fid2)) &&
732                     memcmp(&data->fid3, &mds_body->fid1, sizeof(data->fid3)))
733                         RETURN(-ESTALE);
734         }
735
736         rc = it_open_error(DISP_LOOKUP_EXECD, it);
737         if (rc)
738                 RETURN(rc);
739
740         /* keep requests around for the multiple phases of the call
741          * this shows the DISP_XX must guarantee we make it into the call
742          */
743         if (!it_disposition(it, DISP_ENQ_CREATE_REF) &&
744             it_disposition(it, DISP_OPEN_CREATE) &&
745             !it_open_error(DISP_OPEN_CREATE, it)) {
746                 it_set_disposition(it, DISP_ENQ_CREATE_REF);
747                 ptlrpc_request_addref(req); /* balanced in ll_create_node */
748         }
749         if (!it_disposition(it, DISP_ENQ_OPEN_REF) &&
750             it_disposition(it, DISP_OPEN_OPEN) &&
751             !it_open_error(DISP_OPEN_OPEN, it)) {
752                 it_set_disposition(it, DISP_ENQ_OPEN_REF);
753                 ptlrpc_request_addref(req); /* balanced in ll_file_open */
754                 /* BUG 11546 - eviction in the middle of open rpc processing */
755                 OBD_FAIL_TIMEOUT(OBD_FAIL_MDC_ENQUEUE_PAUSE, obd_timeout);
756         }
757
758         if (it->it_op & IT_CREAT) {
759                 /* XXX this belongs in ll_create_it */
760         } else if (it->it_op == IT_OPEN) {
761                 LASSERT(!it_disposition(it, DISP_OPEN_CREATE));
762         } else {
763                 LASSERT(it->it_op & (IT_GETATTR | IT_LOOKUP));
764         }
765
766         /* If we already have a matching lock, then cancel the new
767          * one.  We have to set the data here instead of in
768          * mdc_enqueue, because we need to use the child's inode as
769          * the l_ast_data to match, and that's not available until
770          * intent_finish has performed the iget().) */
771         lock = ldlm_handle2lock(lockh);
772         if (lock) {
773                 ldlm_policy_data_t policy = lock->l_policy_data;
774
775                 LDLM_DEBUG(lock, "matching against this");
776                 LDLM_LOCK_PUT(lock);
777                 memcpy(&old_lock, lockh, sizeof(*lockh));
778                 if (ldlm_lock_match(NULL, LDLM_FL_BLOCK_GRANTED, NULL,
779                                     LDLM_IBITS, &policy, LCK_NL, &old_lock)) {
780                         ldlm_lock_decref_and_cancel(lockh,
781                                                     it->d.lustre.it_lock_mode);
782                         memcpy(lockh, &old_lock, sizeof(old_lock));
783                         memcpy(&it->d.lustre.it_lock_handle, lockh,
784                                sizeof(*lockh));
785                 }
786         }
787
788         CDEBUG(D_DENTRY,"D_IT dentry %.*s intent: %s status %d disp %x rc %d\n",
789                data->namelen, data->name, ldlm_it2str(it->it_op),
790                it->d.lustre.it_status, it->d.lustre.it_disposition, rc);
791         RETURN(rc);
792 }
793
794 /* 
795  * This long block is all about fixing up the lock and request state
796  * so that it is correct as of the moment _before_ the operation was
797  * applied; that way, the VFS will think that everything is normal and
798  * call Lustre's regular VFS methods.
799  *
800  * If we're performing a creation, that means that unless the creation
801  * failed with EEXIST, we should fake up a negative dentry.
802  *
803  * For everything else, we want to lookup to succeed.
804  *
805  * One additional note: if CREATE or OPEN succeeded, we add an extra
806  * reference to the request because we need to keep it around until
807  * ll_create/ll_open gets called.
808  *
809  * The server will return to us, in it_disposition, an indication of
810  * exactly what d.lustre.it_status refers to.
811  *
812  * If DISP_OPEN_OPEN is set, then d.lustre.it_status refers to the open() call,
813  * otherwise if DISP_OPEN_CREATE is set, then it status is the
814  * creation failure mode.  In either case, one of DISP_LOOKUP_NEG or
815  * DISP_LOOKUP_POS will be set, indicating whether the child lookup
816  * was successful.
817  *
818  * Else, if DISP_LOOKUP_EXECD then d.lustre.it_status is the rc of the
819  * child lookup.
820  */
821 int mdc_intent_lock(struct obd_export *exp, struct mdc_op_data *op_data,
822                     void *lmm, int lmmsize, struct lookup_intent *it,
823                     int lookup_flags, struct ptlrpc_request **reqp,
824                     ldlm_blocking_callback cb_blocking, int extra_lock_flags)
825 {
826         struct lustre_handle lockh;
827         int rc;
828         ENTRY;
829
830         LASSERT(it);
831
832         CDEBUG(D_DLMTRACE,"name: %.*s("DFID") in inode ("DFID"), "
833                "intent: %s flags %#o\n",
834                op_data->namelen, op_data->name,
835                PFID(((void *)&op_data->fid2)),
836                PFID(((void *)&op_data->fid1)),
837                ldlm_it2str(it->it_op), it->it_flags);
838
839         lockh.cookie = 0;
840         if (op_data->fid2.id &&
841             (it->it_op == IT_LOOKUP || it->it_op == IT_GETATTR)) {
842                 rc = mdc_revalidate_lock(exp, it, &op_data->fid2);
843                 /* Only return failure if it was not GETATTR by cfid
844                    (from inode_revalidate) */
845                 if (rc || op_data->namelen != 0)
846                         RETURN(rc);
847         }
848
849         /* lookup_it may be called only after revalidate_it has run, because
850          * revalidate_it cannot return errors, only zero.  Returning zero causes
851          * this call to lookup, which *can* return an error.
852          *
853          * We only want to execute the request associated with the intent one
854          * time, however, so don't send the request again.  Instead, skip past
855          * this and use the request from revalidate.  In this case, revalidate
856          * never dropped its reference, so the refcounts are all OK */
857         if (!it_disposition(it, DISP_ENQ_COMPLETE)) {
858                 struct ldlm_enqueue_info einfo =
859                         { LDLM_IBITS, it_to_lock_mode(it), cb_blocking,
860                           ldlm_completion_ast, NULL, NULL };
861
862                 rc = mdc_enqueue(exp, &einfo, it, op_data, &lockh,
863                                  lmm, lmmsize, extra_lock_flags);
864                 if (rc < 0)
865                         RETURN(rc);
866         } else if (!op_data->fid2.id) {
867                 /* DISP_ENQ_COMPLETE set means there is extra reference on
868                  * request referenced from this intent, saved for subsequent
869                  * lookup.  This path is executed when we proceed to this
870                  * lookup, so we clear DISP_ENQ_COMPLETE */
871                 it_clear_disposition(it, DISP_ENQ_COMPLETE);
872         }
873
874         *reqp = it->d.lustre.it_data;
875         rc = mdc_finish_intent_lock(exp, *reqp, op_data, it, &lockh);
876
877         RETURN(rc);
878 }
879 EXPORT_SYMBOL(mdc_intent_lock);
880
881 static int mdc_intent_getattr_async_interpret(struct ptlrpc_request *req,
882                                               void *unused, int rc)
883 {
884         struct obd_export        *exp = req->rq_async_args.pointer_arg[0];
885         struct md_enqueue_info   *minfo = req->rq_async_args.pointer_arg[1];
886         struct ldlm_enqueue_info *einfo = req->rq_async_args.pointer_arg[2];
887         struct lookup_intent     *it;
888         struct lustre_handle     *lockh;
889         struct obd_device        *obddev;
890         int                       flags = LDLM_FL_HAS_INTENT;
891         ENTRY;
892
893         it    = &minfo->mi_it;
894         lockh = &minfo->mi_lockh;
895
896         obddev = class_exp2obd(exp);
897
898         mdc_exit_request(&obddev->u.cli);
899         if (OBD_FAIL_CHECK(OBD_FAIL_MDC_GETATTR_ENQUEUE))
900                 rc = -ETIMEDOUT;
901
902         rc = ldlm_cli_enqueue_fini(exp, req, einfo->ei_type, 1, einfo->ei_mode,
903                                    &flags, NULL, 0, NULL, lockh, rc);
904         if (rc < 0) {
905                 CERROR("ldlm_cli_enqueue_fini: %d\n", rc);
906                 mdc_clear_replay_flag(req, rc);
907                 GOTO(out, rc);
908         }
909
910         rc = mdc_finish_enqueue(exp, req, einfo, it, lockh, rc);
911         if (rc)
912                 GOTO(out, rc);
913
914         rc = mdc_finish_intent_lock(exp, req, &minfo->mi_data, it, lockh);
915         GOTO(out, rc);
916 out:
917         OBD_FREE_PTR(einfo);
918         minfo->mi_cb(exp, req, minfo, rc);
919
920         return 0;
921 }
922
923 int mdc_intent_getattr_async(struct obd_export *exp,
924                              struct md_enqueue_info *minfo,
925                              struct ldlm_enqueue_info *einfo)
926 {
927         struct mdc_op_data      *op_data = &minfo->mi_data;
928         struct lookup_intent    *it = &minfo->mi_it;
929         struct ptlrpc_request   *req;
930         struct obd_device       *obddev = class_exp2obd(exp);
931         struct ldlm_res_id res_id;
932         ldlm_policy_data_t       policy = {
933                                         .l_inodebits = { MDS_INODELOCK_LOOKUP }
934                                  };
935         int                      rc;
936         int                      flags = LDLM_FL_HAS_INTENT;
937         ENTRY;
938
939         CDEBUG(D_DLMTRACE,"name: %.*s in inode "LPU64", intent: %s flags %#o\n",
940                op_data->namelen, op_data->name, op_data->fid1.id,
941                ldlm_it2str(it->it_op), it->it_flags);
942
943         fid_build_reg_res_name((void *)&op_data->fid1, &res_id);
944         req = mdc_intent_lookup_pack(exp, it, op_data);
945         if (!req)
946                 RETURN(-ENOMEM);
947
948         mdc_enter_request(&obddev->u.cli);
949         rc = ldlm_cli_enqueue(exp, &req, einfo, res_id, &policy, &flags, NULL,
950                               0, NULL, &minfo->mi_lockh, 1);
951         if (rc < 0) {
952                 mdc_exit_request(&obddev->u.cli);
953                 RETURN(rc);
954         }
955
956         req->rq_async_args.pointer_arg[0] = exp;
957         req->rq_async_args.pointer_arg[1] = minfo;
958         req->rq_async_args.pointer_arg[2] = einfo;
959         req->rq_interpret_reply = mdc_intent_getattr_async_interpret;
960         ptlrpcd_add_req(req);
961
962         RETURN(0);
963 }
964 EXPORT_SYMBOL(mdc_intent_getattr_async);