Whamcloud - gitweb
b=18266
[fs/lustre-release.git] / lustre / mdc / mdc_locks.c
1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2  * vim:expandtab:shiftwidth=8:tabstop=8:
3  *
4  * GPL HEADER START
5  *
6  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
7  *
8  * This program is free software; you can redistribute it and/or modify
9  * it under the terms of the GNU General Public License version 2 only,
10  * as published by the Free Software Foundation.
11  *
12  * This program is distributed in the hope that it will be useful, but
13  * WITHOUT ANY WARRANTY; without even the implied warranty of
14  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
15  * General Public License version 2 for more details (a copy is included
16  * in the LICENSE file that accompanied this code).
17  *
18  * You should have received a copy of the GNU General Public License
19  * version 2 along with this program; If not, see
20  * http://www.sun.com/software/products/lustre/docs/GPLv2.pdf
21  *
22  * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara,
23  * CA 95054 USA or visit www.sun.com if you need additional information or
24  * have any questions.
25  *
26  * GPL HEADER END
27  */
28 /*
29  * Copyright  2008 Sun Microsystems, Inc. All rights reserved
30  * Use is subject to license terms.
31  */
32 /*
33  * This file is part of Lustre, http://www.lustre.org/
34  * Lustre is a trademark of Sun Microsystems, Inc.
35  */
36
37 #ifndef EXPORT_SYMTAB
38 # define EXPORT_SYMTAB
39 #endif
40 #define DEBUG_SUBSYSTEM S_MDC
41
42 #ifdef __KERNEL__
43 # include <linux/module.h>
44 # include <linux/pagemap.h>
45 # include <linux/miscdevice.h>
46 # include <linux/init.h>
47 #else
48 # include <liblustre.h>
49 #endif
50
51 #include <obd_class.h>
52 #include <lustre_dlm.h>
53 #include <lprocfs_status.h>
54 #include "mdc_internal.h"
55
56 int it_disposition(struct lookup_intent *it, int flag)
57 {
58         return it->d.lustre.it_disposition & flag;
59 }
60 EXPORT_SYMBOL(it_disposition);
61
62 void it_set_disposition(struct lookup_intent *it, int flag)
63 {
64         it->d.lustre.it_disposition |= flag;
65 }
66 EXPORT_SYMBOL(it_set_disposition);
67
68 void it_clear_disposition(struct lookup_intent *it, int flag)
69 {
70         it->d.lustre.it_disposition &= ~flag;
71 }
72 EXPORT_SYMBOL(it_clear_disposition);
73
74 int it_open_error(int phase, struct lookup_intent *it)
75 {
76         if (it_disposition(it, DISP_OPEN_OPEN)) {
77                 if (phase >= DISP_OPEN_OPEN)
78                         return it->d.lustre.it_status;
79                 else
80                         return 0;
81         }
82
83         if (it_disposition(it, DISP_OPEN_CREATE)) {
84                 if (phase >= DISP_OPEN_CREATE)
85                         return it->d.lustre.it_status;
86                 else
87                         return 0;
88         }
89
90         if (it_disposition(it, DISP_LOOKUP_EXECD)) {
91                 if (phase >= DISP_LOOKUP_EXECD)
92                         return it->d.lustre.it_status;
93                 else
94                         return 0;
95         }
96
97         if (it_disposition(it, DISP_IT_EXECD)) {
98                 if (phase >= DISP_IT_EXECD)
99                         return it->d.lustre.it_status;
100                 else
101                         return 0;
102         }
103         CERROR("it disp: %X, status: %d\n", it->d.lustre.it_disposition,
104                it->d.lustre.it_status);
105         LBUG();
106         return 0;
107 }
108 EXPORT_SYMBOL(it_open_error);
109
110 /* this must be called on a lockh that is known to have a referenced lock */
111 void mdc_set_lock_data(__u64 *l, void *data)
112 {
113         struct ldlm_lock *lock;
114         struct lustre_handle *lockh = (struct lustre_handle *)l;
115         ENTRY;
116
117         if (!*l) {
118                 EXIT;
119                 return;
120         }
121
122         lock = ldlm_handle2lock(lockh);
123
124         LASSERT(lock != NULL);
125         lock_res_and_lock(lock);
126 #ifdef __KERNEL__
127         if (lock->l_ast_data && lock->l_ast_data != data) {
128                 struct inode *new_inode = data;
129                 struct inode *old_inode = lock->l_ast_data;
130                 LASSERTF(old_inode->i_state & I_FREEING,
131                          "Found existing inode %p/%lu/%u state %lu in lock: "
132                          "setting data to %p/%lu/%u\n", old_inode,
133                          old_inode->i_ino, old_inode->i_generation,
134                          old_inode->i_state,
135                          new_inode, new_inode->i_ino, new_inode->i_generation);
136         }
137 #endif
138         lock->l_ast_data = data;
139         unlock_res_and_lock(lock);
140         LDLM_LOCK_PUT(lock);
141
142         EXIT;
143 }
144 EXPORT_SYMBOL(mdc_set_lock_data);
145
146 int mdc_change_cbdata(struct obd_export *exp, struct ll_fid *fid, 
147                       ldlm_iterator_t it, void *data)
148 {
149         struct ldlm_res_id res_id;
150         ENTRY;
151
152         fid_build_reg_res_name((struct lu_fid*)fid, &res_id);
153         ldlm_resource_iterate(class_exp2obd(exp)->obd_namespace, &res_id,
154                               it, data);
155
156         EXIT;
157         return 0;
158 }
159
160 static inline void mdc_clear_replay_flag(struct ptlrpc_request *req, int rc)
161 {
162         /* Don't hold error requests for replay. */
163         if (req->rq_replay) {
164                 spin_lock(&req->rq_lock);
165                 req->rq_replay = 0;
166                 spin_unlock(&req->rq_lock);
167         }
168         if (rc && req->rq_transno != 0) {
169                 DEBUG_REQ(D_ERROR, req, "transno returned on error rc %d", rc);
170                 LBUG();
171         }
172 }
173
174 static int round_up(int val)
175 {
176         int ret = 1;
177         while (val) {
178                 val >>= 1;
179                 ret <<= 1;
180         }
181         return ret;
182 }
183
184 /* Save a large LOV EA into the request buffer so that it is available
185  * for replay.  We don't do this in the initial request because the
186  * original request doesn't need this buffer (at most it sends just the
187  * lov_mds_md) and it is a waste of RAM/bandwidth to send the empty
188  * buffer and may also be difficult to allocate and save a very large
189  * request buffer for each open. (bug 5707)
190  *
191  * OOM here may cause recovery failure if lmm is needed (only for the
192  * original open if the MDS crashed just when this client also OOM'd)
193  * but this is incredibly unlikely, and questionable whether the client
194  * could do MDS recovery under OOM anyways... */
195 static void mdc_realloc_openmsg(struct ptlrpc_request *req,
196                                 struct mds_body *body)
197 {
198         int old_len, new_size, old_size;
199         struct lustre_msg *old_msg = req->rq_reqmsg;
200         struct lustre_msg *new_msg;
201         int offset;
202
203         if (mdc_req_is_2_0_server(req))
204                 offset = 4;
205         else
206                 offset = 2;
207
208         old_len = lustre_msg_buflen(old_msg, DLM_INTENT_REC_OFF + offset);
209         old_size = lustre_packed_msg_size(old_msg);
210         lustre_msg_set_buflen(old_msg, DLM_INTENT_REC_OFF + offset,
211                               body->eadatasize);
212         new_size = lustre_packed_msg_size(old_msg);
213
214         OBD_ALLOC(new_msg, new_size);
215         if (new_msg != NULL) {
216                 DEBUG_REQ(D_INFO, req, "replace reqmsg for larger EA %u",
217                           body->eadatasize);
218                 memcpy(new_msg, old_msg, old_size);
219
220                 spin_lock(&req->rq_lock);
221                 req->rq_reqmsg = new_msg;
222                 req->rq_reqlen = new_size;
223                 spin_unlock(&req->rq_lock);
224
225                 OBD_FREE(old_msg, old_size);
226         } else {
227                 lustre_msg_set_buflen(old_msg,
228                                       DLM_INTENT_REC_OFF + offset, old_len);
229                 body->valid &= ~OBD_MD_FLEASIZE;
230                 body->eadatasize = 0;
231         }
232 }
233
234 static struct ptlrpc_request *mdc_intent_open_pack(struct obd_export *exp,
235                                                    struct lookup_intent *it,
236                                                    struct mdc_op_data *data,
237                                                    void *lmm, __u32 lmmsize)
238 {
239         struct ptlrpc_request *req;
240         struct ldlm_intent *lit;
241         struct obd_device *obddev = class_exp2obd(exp);
242         __u32 size[9] = { [MSG_PTLRPC_BODY_OFF] = sizeof(struct ptlrpc_body),
243                         [DLM_LOCKREQ_OFF]     = sizeof(struct ldlm_request),
244                         [DLM_INTENT_IT_OFF]   = sizeof(*lit),
245                         [DLM_INTENT_REC_OFF]  = sizeof(struct mds_rec_create),
246                         [DLM_INTENT_REC_OFF+1]= data->namelen + 1,
247                         /* As an optimization, we allocate an RPC request buffer
248                          * for at least a default-sized LOV EA even if we aren't
249                          * sending one.  We grow the whole request to the next
250                          * power-of-two size since we get that much from a slab
251                          * allocation anyways. This avoids an allocation below
252                          * in the common case where we need to save a
253                          * default-sized LOV EA for open replay. */
254                         [DLM_INTENT_REC_OFF+2]= max(lmmsize,
255                                          obddev->u.cli.cl_default_mds_easize) };
256         __u32 repsize[7] = { [MSG_PTLRPC_BODY_OFF] = sizeof(struct ptlrpc_body),
257                            [DLM_LOCKREPLY_OFF]   = sizeof(struct ldlm_reply),
258                            [DLM_REPLY_REC_OFF]   = sizeof(struct mdt_body),
259                            [DLM_REPLY_REC_OFF+1] = obddev->u.cli.
260                                                         cl_max_mds_easize,
261                            [DLM_REPLY_REC_OFF+2] = LUSTRE_POSIX_ACL_MAX_SIZE };
262         CFS_LIST_HEAD(cancels);
263         int do_join = (it->it_flags & O_JOIN_FILE) && data->data;
264         int count = 0;
265         int bufcount = 6;
266         int repbufcount = 5;
267         int mode;
268         int rc;
269         ENTRY;
270
271         it->it_create_mode = (it->it_create_mode & ~S_IFMT) | S_IFREG;
272         if (mdc_exp_is_2_0_server(exp)) {
273                 size[DLM_INTENT_REC_OFF] = sizeof(struct mdt_rec_create);
274                 size[DLM_INTENT_REC_OFF+4] = size[DLM_INTENT_REC_OFF+2];
275                 size[DLM_INTENT_REC_OFF+3] = size[DLM_INTENT_REC_OFF+1];
276                 size[DLM_INTENT_REC_OFF+2] = 0; /* capa */
277                 size[DLM_INTENT_REC_OFF+1] = 0; /* capa */
278                 bufcount = 8;
279                 repsize[DLM_REPLY_REC_OFF+3]=sizeof(struct lustre_capa);
280                 repsize[DLM_REPLY_REC_OFF+4]=sizeof(struct lustre_capa);
281                 repbufcount = 7;
282         }
283         rc = lustre_msg_size(class_exp2cliimp(exp)->imp_msg_magic,
284                              bufcount, size);
285         if (rc & (rc - 1))
286                 size[bufcount - 1] = min(size[bufcount - 1] + round_up(rc) - rc,
287                                          (__u32)obddev->u.cli.cl_max_mds_easize);
288
289         /* If inode is known, cancel conflicting OPEN locks. */
290         if (data->fid2.id) {
291                 if (it->it_flags & (FMODE_WRITE|MDS_OPEN_TRUNC))
292                         mode = LCK_CW;
293 #ifdef FMODE_EXEC
294                 else if (it->it_flags & FMODE_EXEC)
295                         mode = LCK_PR;
296 #endif
297                 else
298                         mode = LCK_CR;
299                 count = mdc_resource_get_unused(exp, &data->fid2, &cancels,
300                                                 mode, MDS_INODELOCK_OPEN);
301         }
302
303         /* If CREATE or JOIN_FILE, cancel parent's UPDATE lock. */
304         if (it->it_op & IT_CREAT || do_join)
305                 mode = LCK_EX;
306         else
307                 mode = LCK_CR;
308         count += mdc_resource_get_unused(exp, &data->fid1, &cancels, mode,
309                                          MDS_INODELOCK_UPDATE);
310         if (do_join) {
311                 __u64 head_size = (*(__u64 *)data->data);
312                 /* join is like an unlink of the tail */
313                 if (mdc_exp_is_2_0_server(exp)) {
314                         size[DLM_INTENT_REC_OFF+5]=sizeof(struct mdt_rec_join);
315                 } else {
316                         size[DLM_INTENT_REC_OFF+3]=sizeof(struct mds_rec_join);
317                 }
318                 bufcount++;
319
320                 req = ldlm_prep_enqueue_req(exp, bufcount, size,&cancels,count);
321                 if (req)
322                         mdc_join_pack(req, bufcount - 1, data, head_size);
323         } else {
324                 req = ldlm_prep_enqueue_req(exp, bufcount, size,&cancels,count);
325                 it->it_flags &= ~O_JOIN_FILE;
326         }
327
328         if (req) {
329                 spin_lock(&req->rq_lock);
330                 req->rq_replay = 1;
331                 spin_unlock(&req->rq_lock);
332
333                 /* pack the intent */
334                 lit = lustre_msg_buf(req->rq_reqmsg, DLM_INTENT_IT_OFF,
335                                      sizeof(*lit));
336                 lit->opc = (__u64)it->it_op;
337
338                 /* pack the intended request */
339                 mdc_open_pack(req, DLM_INTENT_REC_OFF, data,
340                               it->it_create_mode, 0, it->it_flags,
341                               lmm, lmmsize);
342
343                 ptlrpc_req_set_repsize(req, repbufcount, repsize);
344         }
345         RETURN(req);
346 }
347
348 static struct ptlrpc_request *mdc_intent_unlink_pack(struct obd_export *exp,
349                                                      struct lookup_intent *it,
350                                                      struct mdc_op_data *data)
351 {
352         struct ptlrpc_request *req;
353         struct ldlm_intent *lit;
354         struct obd_device *obddev = class_exp2obd(exp);
355         __u32 size[5] = { [MSG_PTLRPC_BODY_OFF] = sizeof(struct ptlrpc_body),
356                         [DLM_LOCKREQ_OFF]     = sizeof(struct ldlm_request),
357                         [DLM_INTENT_IT_OFF]   = sizeof(*lit),
358                         [DLM_INTENT_REC_OFF]  = mdc_exp_is_2_0_server(exp) ?
359                                                 sizeof(struct mdt_rec_unlink) :
360                                                 sizeof(struct mds_rec_unlink),
361                         [DLM_INTENT_REC_OFF+1]= data->namelen + 1 };
362         __u32 repsize[5] = { [MSG_PTLRPC_BODY_OFF] = sizeof(struct ptlrpc_body),
363                            [DLM_LOCKREPLY_OFF]   = sizeof(struct ldlm_reply),
364                            [DLM_REPLY_REC_OFF]   = sizeof(struct mdt_body),
365                            [DLM_REPLY_REC_OFF+1] = obddev->u.cli.
366                                                         cl_max_mds_easize,
367                            [DLM_REPLY_REC_OFF+2] = obddev->u.cli.
368                                                         cl_max_mds_cookiesize };
369         ENTRY;
370
371         req = ldlm_prep_enqueue_req(exp, 5, size, NULL, 0);
372         if (req) {
373                 /* pack the intent */
374                 lit = lustre_msg_buf(req->rq_reqmsg, DLM_INTENT_IT_OFF,
375                                      sizeof(*lit));
376                 lit->opc = (__u64)it->it_op;
377
378                 /* pack the intended request */
379                 mdc_unlink_pack(req, DLM_INTENT_REC_OFF, data);
380
381                 ptlrpc_req_set_repsize(req, 5, repsize);
382         }
383         RETURN(req);
384 }
385
386 static struct ptlrpc_request *mdc_intent_lookup_pack(struct obd_export *exp,
387                                                      struct lookup_intent *it,
388                                                      struct mdc_op_data *data)
389 {
390         struct ptlrpc_request *req;
391         struct ldlm_intent *lit;
392         struct obd_device *obddev = class_exp2obd(exp);
393         __u32 size[6] = { [MSG_PTLRPC_BODY_OFF] = sizeof(struct ptlrpc_body),
394                         [DLM_LOCKREQ_OFF]     = sizeof(struct ldlm_request),
395                         [DLM_INTENT_IT_OFF]   = sizeof(*lit),
396                         [DLM_INTENT_REC_OFF]  = sizeof(struct mdt_body),
397                         [DLM_INTENT_REC_OFF+1]= data->namelen + 1,
398                         [DLM_INTENT_REC_OFF+2]= 0 };
399         __u32 repsize[6] = { [MSG_PTLRPC_BODY_OFF] = sizeof(struct ptlrpc_body),
400                            [DLM_LOCKREPLY_OFF]   = sizeof(struct ldlm_reply),
401                            [DLM_REPLY_REC_OFF]   = sizeof(struct mdt_body),
402                            [DLM_REPLY_REC_OFF+1] = obddev->u.cli.
403                                                         cl_max_mds_easize,
404                            [DLM_REPLY_REC_OFF+2] = LUSTRE_POSIX_ACL_MAX_SIZE,
405                            [DLM_REPLY_REC_OFF+3] = 0 };
406         obd_valid valid = OBD_MD_FLGETATTR | OBD_MD_FLEASIZE | OBD_MD_FLACL |
407                           OBD_MD_FLMODEASIZE | OBD_MD_FLDIREA;
408         int bufcount = 5;
409         ENTRY;
410
411         if (mdc_exp_is_2_0_server(exp)) {
412                 size[DLM_INTENT_REC_OFF+1] = 0; /* capa */
413                 size[DLM_INTENT_REC_OFF+2] = data->namelen + 1;
414                 bufcount = 6;
415         }
416         req = ldlm_prep_enqueue_req(exp, bufcount, size, NULL, 0);
417         if (req) {
418                 /* pack the intent */
419                 lit = lustre_msg_buf(req->rq_reqmsg, DLM_INTENT_IT_OFF,
420                                      sizeof(*lit));
421                 lit->opc = (__u64)it->it_op;
422
423                 /* pack the intended request */
424                 mdc_getattr_pack(req, DLM_INTENT_REC_OFF, valid, it->it_flags,
425                                  data);
426                 ptlrpc_req_set_repsize(req, bufcount, repsize);
427         }
428         RETURN(req);
429 }
430
431 static struct ptlrpc_request *mdc_intent_readdir_pack(struct obd_export *exp)
432 {
433         struct ptlrpc_request *req;
434         __u32 size[2] = { [MSG_PTLRPC_BODY_OFF] = sizeof(struct ptlrpc_body),
435                         [DLM_LOCKREQ_OFF]     = sizeof(struct ldlm_request) };
436         __u32 repsize[3] = { [MSG_PTLRPC_BODY_OFF] = sizeof(struct ptlrpc_body),
437                            [DLM_LOCKREPLY_OFF]   = sizeof(struct ldlm_reply),
438                            [DLM_REPLY_REC_OFF] = sizeof(struct ost_lvb) };
439         ENTRY;
440
441         req = ldlm_prep_enqueue_req(exp, 2, size, NULL, 0);
442         if (req)
443                 ptlrpc_req_set_repsize(req, 3, repsize);
444         RETURN(req);
445 }
446
447 static int mdc_finish_enqueue(struct obd_export *exp,
448                               struct ptlrpc_request *req,
449                               struct ldlm_enqueue_info *einfo,
450                               struct lookup_intent *it,
451                               struct lustre_handle *lockh,
452                               int rc)
453 {
454         struct ldlm_request *lockreq;
455         struct ldlm_reply *lockrep;
456         ENTRY;
457
458         LASSERT(rc >= 0);
459         /* Similarly, if we're going to replay this request, we don't want to
460          * actually get a lock, just perform the intent. */
461         if (req->rq_transno || req->rq_replay) {
462                 lockreq = lustre_msg_buf(req->rq_reqmsg, DLM_LOCKREQ_OFF,
463                                          sizeof(*lockreq));
464                 lockreq->lock_flags |= LDLM_FL_INTENT_ONLY;
465         }
466
467         if (rc == ELDLM_LOCK_ABORTED) {
468                 einfo->ei_mode = 0;
469                 memset(lockh, 0, sizeof(*lockh));
470                 rc = 0;
471         } else { /* rc = 0 */
472                 struct ldlm_lock *lock = ldlm_handle2lock(lockh);
473                 LASSERT(lock);
474
475                 /* If the server gave us back a different lock mode, we should
476                  * fix up our variables. */
477                 if (lock->l_req_mode != einfo->ei_mode) {
478                         ldlm_lock_addref(lockh, lock->l_req_mode);
479                         ldlm_lock_decref(lockh, einfo->ei_mode);
480                         einfo->ei_mode = lock->l_req_mode;
481                 }
482                 LDLM_LOCK_PUT(lock);
483         }
484
485         lockrep = lustre_msg_buf(req->rq_repmsg, DLM_LOCKREPLY_OFF,
486                                  sizeof(*lockrep));
487         LASSERT(lockrep != NULL);  /* checked by ldlm_cli_enqueue() */
488         /* swabbed by ldlm_cli_enqueue() */
489         LASSERT(lustre_rep_swabbed(req, DLM_LOCKREPLY_OFF));
490
491         it->d.lustre.it_disposition = (int)lockrep->lock_policy_res1;
492         it->d.lustre.it_status = (int)lockrep->lock_policy_res2;
493         it->d.lustre.it_lock_mode = einfo->ei_mode;
494         it->d.lustre.it_lock_handle = lockh->cookie;
495         it->d.lustre.it_data = req;
496
497         if (it->d.lustre.it_status < 0 && req->rq_replay)
498                 mdc_clear_replay_flag(req, it->d.lustre.it_status);
499
500         /* If we're doing an IT_OPEN which did not result in an actual
501          * successful open, then we need to remove the bit which saves
502          * this request for unconditional replay.
503          *
504          * It's important that we do this first!  Otherwise we might exit the
505          * function without doing so, and try to replay a failed create
506          * (bug 3440) */
507         if ((it->it_op & IT_OPEN) &&
508             req->rq_replay &&
509             (!it_disposition(it, DISP_OPEN_OPEN) ||
510              it->d.lustre.it_status != 0))
511                 mdc_clear_replay_flag(req, it->d.lustre.it_status);
512
513         DEBUG_REQ(D_RPCTRACE, req, "op: %d disposition: %x, status: %d",
514                   it->it_op,it->d.lustre.it_disposition,it->d.lustre.it_status);
515
516         /* We know what to expect, so we do any byte flipping required here */
517         if (it->it_op & (IT_OPEN | IT_UNLINK | IT_LOOKUP | IT_GETATTR)) {
518                 struct mds_body *body;
519
520                 body = lustre_swab_repbuf(req, DLM_REPLY_REC_OFF, sizeof(*body),
521                                          lustre_swab_mds_body);
522                 if (body == NULL) {
523                         CERROR ("Can't swab mds_body\n");
524                         RETURN (-EPROTO);
525                 }
526
527                 /* If this is a successful OPEN request, we need to set
528                    replay handler and data early, so that if replay happens
529                    immediately after swabbing below, new reply is swabbed
530                    by that handler correctly */
531                 if (it_disposition(it, DISP_OPEN_OPEN) &&
532                     !it_open_error(DISP_OPEN_OPEN, it))
533                         mdc_set_open_replay_data(NULL, req);
534
535                 if ((body->valid & OBD_MD_FLEASIZE) != 0) {
536                         void *eadata;
537
538                         /* The eadata is opaque; just check that it is there.
539                          * Eventually, obd_unpackmd() will check the contents */
540                         eadata = lustre_swab_repbuf(req, DLM_REPLY_REC_OFF + 1,
541                                                     body->eadatasize, NULL);
542                         if (eadata == NULL) {
543                                 CERROR ("Missing/short eadata\n");
544                                 RETURN (-EPROTO);
545                         }
546                         /* We save the reply LOV EA in case we have to replay
547                          * a create for recovery.  If we didn't allocate a
548                          * large enough request buffer above we need to
549                          * reallocate it here to hold the actual LOV EA. */
550                         if (it->it_op & IT_OPEN) {
551                                 int offset = DLM_INTENT_REC_OFF;
552                                 void *lmm;
553
554                                 if (mdc_req_is_2_0_server(req))
555                                         offset += 4;
556                                 else
557                                         offset += 2;
558
559                                 if (lustre_msg_buflen(req->rq_reqmsg, offset) <
560                                     body->eadatasize)
561                                         mdc_realloc_openmsg(req, body);
562
563                                 lmm = lustre_msg_buf(req->rq_reqmsg, offset,
564                                                      body->eadatasize);
565                                 if (lmm)
566                                         memcpy(lmm, eadata, body->eadatasize);
567                         }
568                 }
569         }
570
571         RETURN(rc);
572 }
573
574 /* We always reserve enough space in the reply packet for a stripe MD, because
575  * we don't know in advance the file type. */
576 int mdc_enqueue(struct obd_export *exp, struct ldlm_enqueue_info *einfo,
577                 struct lookup_intent *it, struct mdc_op_data *data,
578                 struct lustre_handle *lockh, void *lmm, int lmmsize,
579                 int extra_lock_flags)
580 {
581         struct ptlrpc_request *req;
582         struct obd_device *obddev = class_exp2obd(exp);
583         struct ldlm_res_id res_id;
584         ldlm_policy_data_t policy = { .l_inodebits = { MDS_INODELOCK_LOOKUP } };
585         int flags = extra_lock_flags | LDLM_FL_HAS_INTENT;
586         int rc;
587         ENTRY;
588
589         fid_build_reg_res_name((void *)&data->fid1, &res_id);
590         LASSERTF(einfo->ei_type == LDLM_IBITS,"lock type %d\n", einfo->ei_type);
591         if (it->it_op & (IT_UNLINK | IT_GETATTR | IT_READDIR))
592                 policy.l_inodebits.bits = MDS_INODELOCK_UPDATE;
593
594         if (it->it_op & IT_OPEN) {
595                 if ((it->it_op & IT_CREAT) && mdc_exp_is_2_0_server(exp)) {
596                         struct client_obd *cli = &obddev->u.cli;
597                         data->fid3 = data->fid2;
598                         rc = mdc_fid_alloc(cli->cl_seq, (void *)&data->fid2);
599                         if (rc) {
600                                 CERROR("fid allocation result: %d\n", rc);
601                                 RETURN(rc);
602                         }
603                 }
604                 req = mdc_intent_open_pack(exp, it, data, lmm, lmmsize);
605                 if (it->it_flags & O_JOIN_FILE) {
606                         policy.l_inodebits.bits = MDS_INODELOCK_UPDATE;
607                 }
608         } else if (it->it_op & IT_UNLINK) {
609                 req = mdc_intent_unlink_pack(exp, it, data);
610         } else if (it->it_op & (IT_GETATTR | IT_LOOKUP)) {
611                 req = mdc_intent_lookup_pack(exp, it, data);
612         } else if (it->it_op == IT_READDIR) {
613                 req = mdc_intent_readdir_pack(exp);
614         } else {
615                 CERROR("bad it_op %x\n", it->it_op);
616                 RETURN(-EINVAL);
617         }
618
619         if (!req)
620                 RETURN(-ENOMEM);
621
622          /* It is important to obtain rpc_lock first (if applicable), so that
623           * threads that are serialised with rpc_lock are not polluting our
624           * rpcs in flight counter */
625         mdc_get_rpc_lock(obddev->u.cli.cl_rpc_lock, it);
626         mdc_enter_request(&obddev->u.cli);
627         rc = ldlm_cli_enqueue(exp, &req, einfo, res_id, &policy, &flags, NULL,
628                               0, NULL, lockh, 0);
629         mdc_exit_request(&obddev->u.cli);
630         mdc_put_rpc_lock(obddev->u.cli.cl_rpc_lock, it);
631         if (rc < 0) {
632                 CERROR("ldlm_cli_enqueue: %d\n", rc);
633                 mdc_clear_replay_flag(req, rc);
634                 ptlrpc_req_finished(req);
635                 RETURN(rc);
636         }
637         rc = mdc_finish_enqueue(exp, req, einfo, it, lockh, rc);
638
639         RETURN(rc);
640 }
641 EXPORT_SYMBOL(mdc_enqueue);
642
643 int mdc_revalidate_lock(struct obd_export *exp, struct lookup_intent *it,
644                         struct ll_fid *fid)
645 {
646                 /* We could just return 1 immediately, but since we should only
647                  * be called in revalidate_it if we already have a lock, let's
648                  * verify that. */
649         struct ldlm_res_id res_id;
650         struct lustre_handle lockh;
651         ldlm_policy_data_t policy;
652         ldlm_mode_t mode;
653         ENTRY;
654
655         fid_build_reg_res_name((struct lu_fid*)fid, &res_id);
656         /* As not all attributes are kept under update lock, e.g. 
657            owner/group/acls are under lookup lock, we need both 
658            ibits for GETATTR. */
659         policy.l_inodebits.bits = (it->it_op == IT_GETATTR) ?
660                 MDS_INODELOCK_UPDATE | MDS_INODELOCK_LOOKUP :
661                 MDS_INODELOCK_LOOKUP;
662
663         mode = ldlm_lock_match(exp->exp_obd->obd_namespace,
664                                LDLM_FL_BLOCK_GRANTED, &res_id, LDLM_IBITS,
665                                &policy, LCK_CR|LCK_CW|LCK_PR|LCK_PW, &lockh);
666         if (mode) {
667                 memcpy(&it->d.lustre.it_lock_handle, &lockh, sizeof(lockh));
668                 it->d.lustre.it_lock_mode = mode;
669         }
670
671         RETURN(!!mode);
672 }
673 EXPORT_SYMBOL(mdc_revalidate_lock);
674
675 static int mdc_finish_intent_lock(struct obd_export *exp,
676                                   struct ptlrpc_request *req,
677                                   struct mdc_op_data *data,
678                                   struct lookup_intent *it,
679                                   struct lustre_handle *lockh)
680 {
681         struct mds_body *mds_body;
682         struct lustre_handle old_lock;
683         struct ldlm_lock *lock;
684         int rc;
685         ENTRY;
686
687         LASSERT(req != NULL);
688         LASSERT(req != LP_POISON);
689         LASSERT(req->rq_repmsg != LP_POISON);
690
691         if (!it_disposition(it, DISP_IT_EXECD)) {
692                 /* The server failed before it even started executing the
693                  * intent, i.e. because it couldn't unpack the request. */
694                 LASSERT(it->d.lustre.it_status != 0);
695                 RETURN(it->d.lustre.it_status);
696         }
697         rc = it_open_error(DISP_IT_EXECD, it);
698         if (rc)
699                 RETURN(rc);
700
701         mds_body = lustre_msg_buf(req->rq_repmsg, DLM_REPLY_REC_OFF,
702                                   sizeof(*mds_body));
703         /* mdc_enqueue checked */
704         LASSERT(mds_body != NULL);
705         /* mdc_enqueue swabbed */
706         LASSERT(lustre_rep_swabbed(req, DLM_REPLY_REC_OFF));
707
708         /* If we were revalidating a fid/name pair, mark the intent in
709          * case we fail and get called again from lookup */
710
711         if (data->fid2.id && (it->it_op != IT_GETATTR) &&
712            ( !mdc_exp_is_2_0_server(exp) ||
713              (mdc_exp_is_2_0_server(exp) && (it->it_flags & O_CHECK_STALE)))) {
714                 it_set_disposition(it, DISP_ENQ_COMPLETE);
715
716                 /* Also: did we find the same inode? */
717                 if (memcmp(&data->fid2, &mds_body->fid1, sizeof(data->fid2)) &&
718                     memcmp(&data->fid3, &mds_body->fid1, sizeof(data->fid3)))
719                         RETURN(-ESTALE);
720         }
721
722         rc = it_open_error(DISP_LOOKUP_EXECD, it);
723         if (rc)
724                 RETURN(rc);
725
726         /* keep requests around for the multiple phases of the call
727          * this shows the DISP_XX must guarantee we make it into the call
728          */
729         if (!it_disposition(it, DISP_ENQ_CREATE_REF) &&
730             it_disposition(it, DISP_OPEN_CREATE) &&
731             !it_open_error(DISP_OPEN_CREATE, it)) {
732                 it_set_disposition(it, DISP_ENQ_CREATE_REF);
733                 ptlrpc_request_addref(req); /* balanced in ll_create_node */
734         }
735         if (!it_disposition(it, DISP_ENQ_OPEN_REF) &&
736             it_disposition(it, DISP_OPEN_OPEN) &&
737             !it_open_error(DISP_OPEN_OPEN, it)) {
738                 it_set_disposition(it, DISP_ENQ_OPEN_REF);
739                 ptlrpc_request_addref(req); /* balanced in ll_file_open */
740                 /* BUG 11546 - eviction in the middle of open rpc processing */
741                 OBD_FAIL_TIMEOUT(OBD_FAIL_MDC_ENQUEUE_PAUSE, obd_timeout);
742         }
743
744         if (it->it_op & IT_CREAT) {
745                 /* XXX this belongs in ll_create_it */
746         } else if (it->it_op == IT_OPEN) {
747                 LASSERT(!it_disposition(it, DISP_OPEN_CREATE));
748         } else {
749                 LASSERT(it->it_op & (IT_GETATTR | IT_LOOKUP));
750         }
751
752         /* If we already have a matching lock, then cancel the new
753          * one.  We have to set the data here instead of in
754          * mdc_enqueue, because we need to use the child's inode as
755          * the l_ast_data to match, and that's not available until
756          * intent_finish has performed the iget().) */
757         lock = ldlm_handle2lock(lockh);
758         if (lock) {
759                 ldlm_policy_data_t policy = lock->l_policy_data;
760
761                 LDLM_DEBUG(lock, "matching against this");
762                 LDLM_LOCK_PUT(lock);
763                 memcpy(&old_lock, lockh, sizeof(*lockh));
764                 if (ldlm_lock_match(NULL, LDLM_FL_BLOCK_GRANTED, NULL,
765                                     LDLM_IBITS, &policy, LCK_NL, &old_lock)) {
766                         ldlm_lock_decref_and_cancel(lockh,
767                                                     it->d.lustre.it_lock_mode);
768                         memcpy(lockh, &old_lock, sizeof(old_lock));
769                         memcpy(&it->d.lustre.it_lock_handle, lockh,
770                                sizeof(*lockh));
771                 }
772         }
773
774         CDEBUG(D_DENTRY,"D_IT dentry %.*s intent: %s status %d disp %x rc %d\n",
775                data->namelen, data->name, ldlm_it2str(it->it_op),
776                it->d.lustre.it_status, it->d.lustre.it_disposition, rc);
777         RETURN(rc);
778 }
779
780 /* 
781  * This long block is all about fixing up the lock and request state
782  * so that it is correct as of the moment _before_ the operation was
783  * applied; that way, the VFS will think that everything is normal and
784  * call Lustre's regular VFS methods.
785  *
786  * If we're performing a creation, that means that unless the creation
787  * failed with EEXIST, we should fake up a negative dentry.
788  *
789  * For everything else, we want to lookup to succeed.
790  *
791  * One additional note: if CREATE or OPEN succeeded, we add an extra
792  * reference to the request because we need to keep it around until
793  * ll_create/ll_open gets called.
794  *
795  * The server will return to us, in it_disposition, an indication of
796  * exactly what d.lustre.it_status refers to.
797  *
798  * If DISP_OPEN_OPEN is set, then d.lustre.it_status refers to the open() call,
799  * otherwise if DISP_OPEN_CREATE is set, then it status is the
800  * creation failure mode.  In either case, one of DISP_LOOKUP_NEG or
801  * DISP_LOOKUP_POS will be set, indicating whether the child lookup
802  * was successful.
803  *
804  * Else, if DISP_LOOKUP_EXECD then d.lustre.it_status is the rc of the
805  * child lookup.
806  */
807 int mdc_intent_lock(struct obd_export *exp, struct mdc_op_data *op_data,
808                     void *lmm, int lmmsize, struct lookup_intent *it,
809                     int lookup_flags, struct ptlrpc_request **reqp,
810                     ldlm_blocking_callback cb_blocking, int extra_lock_flags)
811 {
812         struct lustre_handle lockh;
813         int rc;
814         ENTRY;
815
816         LASSERT(it);
817
818         CDEBUG(D_DLMTRACE,"name: %.*s("DFID") in inode ("DFID"), "
819                "intent: %s flags %#o\n",
820                op_data->namelen, op_data->name,
821                PFID(((void *)&op_data->fid2)),
822                PFID(((void *)&op_data->fid1)),
823                ldlm_it2str(it->it_op), it->it_flags);
824
825         lockh.cookie = 0;
826         if (op_data->fid2.id &&
827             (it->it_op == IT_LOOKUP || it->it_op == IT_GETATTR)) {
828                 rc = mdc_revalidate_lock(exp, it, &op_data->fid2);
829                 /* Only return failure if it was not GETATTR by cfid
830                    (from inode_revalidate) */
831                 if (rc || op_data->namelen != 0)
832                         RETURN(rc);
833         }
834
835         /* lookup_it may be called only after revalidate_it has run, because
836          * revalidate_it cannot return errors, only zero.  Returning zero causes
837          * this call to lookup, which *can* return an error.
838          *
839          * We only want to execute the request associated with the intent one
840          * time, however, so don't send the request again.  Instead, skip past
841          * this and use the request from revalidate.  In this case, revalidate
842          * never dropped its reference, so the refcounts are all OK */
843         if (!it_disposition(it, DISP_ENQ_COMPLETE)) {
844                 struct ldlm_enqueue_info einfo =
845                         { LDLM_IBITS, it_to_lock_mode(it), cb_blocking,
846                           ldlm_completion_ast, NULL, NULL };
847
848                 rc = mdc_enqueue(exp, &einfo, it, op_data, &lockh,
849                                  lmm, lmmsize, extra_lock_flags);
850                 if (rc < 0)
851                         RETURN(rc);
852         } else if (!op_data->fid2.id) {
853                 /* DISP_ENQ_COMPLETE set means there is extra reference on
854                  * request referenced from this intent, saved for subsequent
855                  * lookup.  This path is executed when we proceed to this
856                  * lookup, so we clear DISP_ENQ_COMPLETE */
857                 it_clear_disposition(it, DISP_ENQ_COMPLETE);
858         }
859
860         *reqp = it->d.lustre.it_data;
861         rc = mdc_finish_intent_lock(exp, *reqp, op_data, it, &lockh);
862
863         RETURN(rc);
864 }
865 EXPORT_SYMBOL(mdc_intent_lock);
866
867 static int mdc_intent_getattr_async_interpret(struct ptlrpc_request *req,
868                                               void *unused, int rc)
869 {
870         struct obd_export        *exp = req->rq_async_args.pointer_arg[0];
871         struct md_enqueue_info   *minfo = req->rq_async_args.pointer_arg[1];
872         struct ldlm_enqueue_info *einfo = req->rq_async_args.pointer_arg[2];
873         struct lookup_intent     *it;
874         struct lustre_handle     *lockh;
875         struct obd_device        *obddev;
876         int                       flags = LDLM_FL_HAS_INTENT;
877         ENTRY;
878
879         it    = &minfo->mi_it;
880         lockh = &minfo->mi_lockh;
881
882         obddev = class_exp2obd(exp);
883
884         mdc_exit_request(&obddev->u.cli);
885         if (OBD_FAIL_CHECK(OBD_FAIL_MDC_GETATTR_ENQUEUE))
886                 rc = -ETIMEDOUT;
887
888         rc = ldlm_cli_enqueue_fini(exp, req, einfo->ei_type, 1, einfo->ei_mode,
889                                    &flags, NULL, 0, NULL, lockh, rc);
890         if (rc < 0) {
891                 CERROR("ldlm_cli_enqueue_fini: %d\n", rc);
892                 mdc_clear_replay_flag(req, rc);
893                 GOTO(out, rc);
894         }
895
896         rc = mdc_finish_enqueue(exp, req, einfo, it, lockh, rc);
897         if (rc)
898                 GOTO(out, rc);
899
900         rc = mdc_finish_intent_lock(exp, req, &minfo->mi_data, it, lockh);
901         GOTO(out, rc);
902 out:
903         OBD_FREE_PTR(einfo);
904         minfo->mi_cb(exp, req, minfo, rc);
905
906         return 0;
907 }
908
909 int mdc_intent_getattr_async(struct obd_export *exp,
910                              struct md_enqueue_info *minfo,
911                              struct ldlm_enqueue_info *einfo)
912 {
913         struct mdc_op_data      *op_data = &minfo->mi_data;
914         struct lookup_intent    *it = &minfo->mi_it;
915         struct ptlrpc_request   *req;
916         struct obd_device       *obddev = class_exp2obd(exp);
917         struct ldlm_res_id res_id;
918         ldlm_policy_data_t       policy = {
919                                         .l_inodebits = { MDS_INODELOCK_LOOKUP }
920                                  };
921         int                      rc;
922         int                      flags = LDLM_FL_HAS_INTENT;
923         ENTRY;
924
925         CDEBUG(D_DLMTRACE,"name: %.*s in inode "LPU64", intent: %s flags %#o\n",
926                op_data->namelen, op_data->name, op_data->fid1.id,
927                ldlm_it2str(it->it_op), it->it_flags);
928
929         fid_build_reg_res_name((void *)&op_data->fid1, &res_id);
930         req = mdc_intent_lookup_pack(exp, it, op_data);
931         if (!req)
932                 RETURN(-ENOMEM);
933
934         mdc_enter_request(&obddev->u.cli);
935         rc = ldlm_cli_enqueue(exp, &req, einfo, res_id, &policy, &flags, NULL,
936                               0, NULL, &minfo->mi_lockh, 1);
937         if (rc < 0) {
938                 mdc_exit_request(&obddev->u.cli);
939                 RETURN(rc);
940         }
941
942         req->rq_async_args.pointer_arg[0] = exp;
943         req->rq_async_args.pointer_arg[1] = minfo;
944         req->rq_async_args.pointer_arg[2] = einfo;
945         req->rq_interpret_reply = mdc_intent_getattr_async_interpret;
946         ptlrpcd_add_req(req);
947
948         RETURN(0);
949 }
950 EXPORT_SYMBOL(mdc_intent_getattr_async);