Whamcloud - gitweb
LU-1146 build: batch update copyright messages
[fs/lustre-release.git] / lustre / mdc / mdc_locks.c
1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2  * vim:expandtab:shiftwidth=8:tabstop=8:
3  *
4  * GPL HEADER START
5  *
6  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
7  *
8  * This program is free software; you can redistribute it and/or modify
9  * it under the terms of the GNU General Public License version 2 only,
10  * as published by the Free Software Foundation.
11  *
12  * This program is distributed in the hope that it will be useful, but
13  * WITHOUT ANY WARRANTY; without even the implied warranty of
14  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
15  * General Public License version 2 for more details (a copy is included
16  * in the LICENSE file that accompanied this code).
17  *
18  * You should have received a copy of the GNU General Public License
19  * version 2 along with this program; If not, see
20  * http://www.sun.com/software/products/lustre/docs/GPLv2.pdf
21  *
22  * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara,
23  * CA 95054 USA or visit www.sun.com if you need additional information or
24  * have any questions.
25  *
26  * GPL HEADER END
27  */
28 /*
29  * Copyright (c) 2003, 2010, Oracle and/or its affiliates. All rights reserved.
30  * Use is subject to license terms.
31  *
32  * Copyright (c) 2011, 2012, Whamcloud, Inc.
33  */
34 /*
35  * This file is part of Lustre, http://www.lustre.org/
36  * Lustre is a trademark of Sun Microsystems, Inc.
37  */
38
39 #ifndef EXPORT_SYMTAB
40 # define EXPORT_SYMTAB
41 #endif
42 #define DEBUG_SUBSYSTEM S_MDC
43
44 #ifdef __KERNEL__
45 # include <linux/module.h>
46 # include <linux/pagemap.h>
47 # include <linux/miscdevice.h>
48 # include <linux/init.h>
49 #else
50 # include <liblustre.h>
51 #endif
52
53 #include <lustre_acl.h>
54 #include <obd_class.h>
55 #include <lustre_dlm.h>
56 /* fid_res_name_eq() */
57 #include <lustre_fid.h>
58 #include <lprocfs_status.h>
59 #include "mdc_internal.h"
60
61 struct mdc_getattr_args {
62         struct obd_export           *ga_exp;
63         struct md_enqueue_info      *ga_minfo;
64         struct ldlm_enqueue_info    *ga_einfo;
65 };
66
67 int it_disposition(struct lookup_intent *it, int flag)
68 {
69         return it->d.lustre.it_disposition & flag;
70 }
71 EXPORT_SYMBOL(it_disposition);
72
73 void it_set_disposition(struct lookup_intent *it, int flag)
74 {
75         it->d.lustre.it_disposition |= flag;
76 }
77 EXPORT_SYMBOL(it_set_disposition);
78
79 void it_clear_disposition(struct lookup_intent *it, int flag)
80 {
81         it->d.lustre.it_disposition &= ~flag;
82 }
83 EXPORT_SYMBOL(it_clear_disposition);
84
85 int it_open_error(int phase, struct lookup_intent *it)
86 {
87         if (it_disposition(it, DISP_OPEN_OPEN)) {
88                 if (phase >= DISP_OPEN_OPEN)
89                         return it->d.lustre.it_status;
90                 else
91                         return 0;
92         }
93
94         if (it_disposition(it, DISP_OPEN_CREATE)) {
95                 if (phase >= DISP_OPEN_CREATE)
96                         return it->d.lustre.it_status;
97                 else
98                         return 0;
99         }
100
101         if (it_disposition(it, DISP_LOOKUP_EXECD)) {
102                 if (phase >= DISP_LOOKUP_EXECD)
103                         return it->d.lustre.it_status;
104                 else
105                         return 0;
106         }
107
108         if (it_disposition(it, DISP_IT_EXECD)) {
109                 if (phase >= DISP_IT_EXECD)
110                         return it->d.lustre.it_status;
111                 else
112                         return 0;
113         }
114         CERROR("it disp: %X, status: %d\n", it->d.lustre.it_disposition,
115                it->d.lustre.it_status);
116         LBUG();
117         return 0;
118 }
119 EXPORT_SYMBOL(it_open_error);
120
121 /* this must be called on a lockh that is known to have a referenced lock */
122 int mdc_set_lock_data(struct obd_export *exp, __u64 *lockh, void *data,
123                       __u64 *bits)
124 {
125         struct ldlm_lock *lock;
126         ENTRY;
127
128         if(bits)
129                 *bits = 0;
130
131         if (!*lockh)
132                 RETURN(0);
133
134         lock = ldlm_handle2lock((struct lustre_handle *)lockh);
135
136         LASSERT(lock != NULL);
137         lock_res_and_lock(lock);
138 #ifdef __KERNEL__
139         if (lock->l_ast_data && lock->l_ast_data != data) {
140                 struct inode *new_inode = data;
141                 struct inode *old_inode = lock->l_ast_data;
142                 LASSERTF(old_inode->i_state & I_FREEING,
143                          "Found existing inode %p/%lu/%u state %lu in lock: "
144                          "setting data to %p/%lu/%u\n", old_inode,
145                          old_inode->i_ino, old_inode->i_generation,
146                          old_inode->i_state,
147                          new_inode, new_inode->i_ino, new_inode->i_generation);
148         }
149 #endif
150         lock->l_ast_data = data;
151         if (bits)
152                 *bits = lock->l_policy_data.l_inodebits.bits;
153
154         unlock_res_and_lock(lock);
155         LDLM_LOCK_PUT(lock);
156
157         RETURN(0);
158 }
159
160 ldlm_mode_t mdc_lock_match(struct obd_export *exp, int flags,
161                            const struct lu_fid *fid, ldlm_type_t type,
162                            ldlm_policy_data_t *policy, ldlm_mode_t mode,
163                            struct lustre_handle *lockh)
164 {
165         struct ldlm_res_id res_id;
166         ldlm_mode_t rc;
167         ENTRY;
168
169         fid_build_reg_res_name(fid, &res_id);
170         rc = ldlm_lock_match(class_exp2obd(exp)->obd_namespace, flags,
171                              &res_id, type, policy, mode, lockh, 0);
172         RETURN(rc);
173 }
174
175 int mdc_cancel_unused(struct obd_export *exp,
176                       const struct lu_fid *fid,
177                       ldlm_policy_data_t *policy,
178                       ldlm_mode_t mode,
179                       ldlm_cancel_flags_t flags,
180                       void *opaque)
181 {
182         struct ldlm_res_id res_id;
183         struct obd_device *obd = class_exp2obd(exp);
184         int rc;
185
186         ENTRY;
187
188         fid_build_reg_res_name(fid, &res_id);
189         rc = ldlm_cli_cancel_unused_resource(obd->obd_namespace, &res_id,
190                                              policy, mode, flags, opaque);
191         RETURN(rc);
192 }
193
194 int mdc_change_cbdata(struct obd_export *exp,
195                       const struct lu_fid *fid,
196                       ldlm_iterator_t it, void *data)
197 {
198         struct ldlm_res_id res_id;
199         ENTRY;
200
201         fid_build_reg_res_name(fid, &res_id);
202         ldlm_resource_iterate(class_exp2obd(exp)->obd_namespace,
203                               &res_id, it, data);
204
205         EXIT;
206         return 0;
207 }
208
209 /* find any ldlm lock of the inode in mdc
210  * return 0    not find
211  *        1    find one
212  *      < 0    error */
213 int mdc_find_cbdata(struct obd_export *exp,
214                     const struct lu_fid *fid,
215                     ldlm_iterator_t it, void *data)
216 {
217         struct ldlm_res_id res_id;
218         int rc = 0;
219         ENTRY;
220
221         fid_build_reg_res_name((struct lu_fid*)fid, &res_id);
222         rc = ldlm_resource_iterate(class_exp2obd(exp)->obd_namespace, &res_id,
223                                    it, data);
224         if (rc == LDLM_ITER_STOP)
225                 RETURN(1);
226         else if (rc == LDLM_ITER_CONTINUE)
227                 RETURN(0);
228         RETURN(rc);
229 }
230
231 static inline void mdc_clear_replay_flag(struct ptlrpc_request *req, int rc)
232 {
233         /* Don't hold error requests for replay. */
234         if (req->rq_replay) {
235                 cfs_spin_lock(&req->rq_lock);
236                 req->rq_replay = 0;
237                 cfs_spin_unlock(&req->rq_lock);
238         }
239         if (rc && req->rq_transno != 0) {
240                 DEBUG_REQ(D_ERROR, req, "transno returned on error rc %d", rc);
241                 LBUG();
242         }
243 }
244
245 /* Save a large LOV EA into the request buffer so that it is available
246  * for replay.  We don't do this in the initial request because the
247  * original request doesn't need this buffer (at most it sends just the
248  * lov_mds_md) and it is a waste of RAM/bandwidth to send the empty
249  * buffer and may also be difficult to allocate and save a very large
250  * request buffer for each open. (bug 5707)
251  *
252  * OOM here may cause recovery failure if lmm is needed (only for the
253  * original open if the MDS crashed just when this client also OOM'd)
254  * but this is incredibly unlikely, and questionable whether the client
255  * could do MDS recovery under OOM anyways... */
256 static void mdc_realloc_openmsg(struct ptlrpc_request *req,
257                                 struct mdt_body *body)
258 {
259         int     rc;
260
261         /* FIXME: remove this explicit offset. */
262         rc = sptlrpc_cli_enlarge_reqbuf(req, DLM_INTENT_REC_OFF + 4,
263                                         body->eadatasize);
264         if (rc) {
265                 CERROR("Can't enlarge segment %d size to %d\n",
266                        DLM_INTENT_REC_OFF + 4, body->eadatasize);
267                 body->valid &= ~OBD_MD_FLEASIZE;
268                 body->eadatasize = 0;
269         }
270 }
271
272 static struct ptlrpc_request *mdc_intent_open_pack(struct obd_export *exp,
273                                                    struct lookup_intent *it,
274                                                    struct md_op_data *op_data,
275                                                    void *lmm, int lmmsize,
276                                                    void *cb_data)
277 {
278         struct ptlrpc_request *req;
279         struct obd_device     *obddev = class_exp2obd(exp);
280         struct ldlm_intent    *lit;
281         CFS_LIST_HEAD(cancels);
282         int                    count = 0;
283         int                    mode;
284         int                    rc;
285         ENTRY;
286
287         it->it_create_mode = (it->it_create_mode & ~S_IFMT) | S_IFREG;
288
289         /* XXX: openlock is not cancelled for cross-refs. */
290         /* If inode is known, cancel conflicting OPEN locks. */
291         if (fid_is_sane(&op_data->op_fid2)) {
292                 if (it->it_flags & (FMODE_WRITE|MDS_OPEN_TRUNC))
293                         mode = LCK_CW;
294 #ifdef FMODE_EXEC
295                 else if (it->it_flags & FMODE_EXEC)
296                         mode = LCK_PR;
297 #endif
298                 else
299                         mode = LCK_CR;
300                 count = mdc_resource_get_unused(exp, &op_data->op_fid2,
301                                                 &cancels, mode,
302                                                 MDS_INODELOCK_OPEN);
303         }
304
305         /* If CREATE, cancel parent's UPDATE lock. */
306         if (it->it_op & IT_CREAT)
307                 mode = LCK_EX;
308         else
309                 mode = LCK_CR;
310         count += mdc_resource_get_unused(exp, &op_data->op_fid1,
311                                          &cancels, mode,
312                                          MDS_INODELOCK_UPDATE);
313
314         req = ptlrpc_request_alloc(class_exp2cliimp(exp),
315                                    &RQF_LDLM_INTENT_OPEN);
316         if (req == NULL) {
317                 ldlm_lock_list_put(&cancels, l_bl_ast, count);
318                 RETURN(ERR_PTR(-ENOMEM));
319         }
320
321         /* parent capability */
322         mdc_set_capa_size(req, &RMF_CAPA1, op_data->op_capa1);
323         /* child capability, reserve the size according to parent capa, it will
324          * be filled after we get the reply */
325         mdc_set_capa_size(req, &RMF_CAPA2, op_data->op_capa1);
326
327         req_capsule_set_size(&req->rq_pill, &RMF_NAME, RCL_CLIENT,
328                              op_data->op_namelen + 1);
329         req_capsule_set_size(&req->rq_pill, &RMF_EADATA, RCL_CLIENT,
330                              max(lmmsize, obddev->u.cli.cl_default_mds_easize));
331
332         rc = ldlm_prep_enqueue_req(exp, req, &cancels, count);
333         if (rc) {
334                 ptlrpc_request_free(req);
335                 return NULL;
336         }
337
338         cfs_spin_lock(&req->rq_lock);
339         req->rq_replay = req->rq_import->imp_replayable;
340         cfs_spin_unlock(&req->rq_lock);
341
342         /* pack the intent */
343         lit = req_capsule_client_get(&req->rq_pill, &RMF_LDLM_INTENT);
344         lit->opc = (__u64)it->it_op;
345
346         /* pack the intended request */
347         mdc_open_pack(req, op_data, it->it_create_mode, 0, it->it_flags, lmm,
348                       lmmsize);
349
350         /* for remote client, fetch remote perm for current user */
351         if (client_is_remote(exp))
352                 req_capsule_set_size(&req->rq_pill, &RMF_ACL, RCL_SERVER,
353                                      sizeof(struct mdt_remote_perm));
354         ptlrpc_request_set_replen(req);
355         return req;
356 }
357
358 static struct ptlrpc_request *mdc_intent_unlink_pack(struct obd_export *exp,
359                                                      struct lookup_intent *it,
360                                                      struct md_op_data *op_data)
361 {
362         struct ptlrpc_request *req;
363         struct obd_device     *obddev = class_exp2obd(exp);
364         struct ldlm_intent    *lit;
365         int                    rc;
366         ENTRY;
367
368         req = ptlrpc_request_alloc(class_exp2cliimp(exp),
369                                    &RQF_LDLM_INTENT_UNLINK);
370         if (req == NULL)
371                 RETURN(ERR_PTR(-ENOMEM));
372
373         mdc_set_capa_size(req, &RMF_CAPA1, op_data->op_capa1);
374         req_capsule_set_size(&req->rq_pill, &RMF_NAME, RCL_CLIENT,
375                              op_data->op_namelen + 1);
376
377         rc = ldlm_prep_enqueue_req(exp, req, NULL, 0);
378         if (rc) {
379                 ptlrpc_request_free(req);
380                 RETURN(ERR_PTR(rc));
381         }
382
383         /* pack the intent */
384         lit = req_capsule_client_get(&req->rq_pill, &RMF_LDLM_INTENT);
385         lit->opc = (__u64)it->it_op;
386
387         /* pack the intended request */
388         mdc_unlink_pack(req, op_data);
389
390         req_capsule_set_size(&req->rq_pill, &RMF_MDT_MD, RCL_SERVER,
391                              obddev->u.cli.cl_max_mds_easize);
392         req_capsule_set_size(&req->rq_pill, &RMF_ACL, RCL_SERVER,
393                              obddev->u.cli.cl_max_mds_cookiesize);
394         ptlrpc_request_set_replen(req);
395         RETURN(req);
396 }
397
398 static struct ptlrpc_request *mdc_intent_getattr_pack(struct obd_export *exp,
399                                                       struct lookup_intent *it,
400                                                       struct md_op_data *op_data)
401 {
402         struct ptlrpc_request *req;
403         struct obd_device     *obddev = class_exp2obd(exp);
404         obd_valid              valid = OBD_MD_FLGETATTR | OBD_MD_FLEASIZE |
405                                        OBD_MD_FLMODEASIZE | OBD_MD_FLDIREA |
406                                        OBD_MD_FLMDSCAPA | OBD_MD_MEA |
407                                        (client_is_remote(exp) ?
408                                                OBD_MD_FLRMTPERM : OBD_MD_FLACL);
409         struct ldlm_intent    *lit;
410         int                    rc;
411         ENTRY;
412
413         req = ptlrpc_request_alloc(class_exp2cliimp(exp),
414                                    &RQF_LDLM_INTENT_GETATTR);
415         if (req == NULL)
416                 RETURN(ERR_PTR(-ENOMEM));
417
418         mdc_set_capa_size(req, &RMF_CAPA1, op_data->op_capa1);
419         req_capsule_set_size(&req->rq_pill, &RMF_NAME, RCL_CLIENT,
420                              op_data->op_namelen + 1);
421
422         rc = ldlm_prep_enqueue_req(exp, req, NULL, 0);
423         if (rc) {
424                 ptlrpc_request_free(req);
425                 RETURN(ERR_PTR(rc));
426         }
427
428         /* pack the intent */
429         lit = req_capsule_client_get(&req->rq_pill, &RMF_LDLM_INTENT);
430         lit->opc = (__u64)it->it_op;
431
432         /* pack the intended request */
433         mdc_getattr_pack(req, valid, it->it_flags, op_data,
434                          obddev->u.cli.cl_max_mds_easize);
435
436         req_capsule_set_size(&req->rq_pill, &RMF_MDT_MD, RCL_SERVER,
437                              obddev->u.cli.cl_max_mds_easize);
438         if (client_is_remote(exp))
439                 req_capsule_set_size(&req->rq_pill, &RMF_ACL, RCL_SERVER,
440                                      sizeof(struct mdt_remote_perm));
441         ptlrpc_request_set_replen(req);
442         RETURN(req);
443 }
444
445 static struct ptlrpc_request *ldlm_enqueue_pack(struct obd_export *exp)
446 {
447         struct ptlrpc_request *req;
448         int rc;
449         ENTRY;
450
451         req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_LDLM_ENQUEUE);
452         if (req == NULL)
453                 RETURN(ERR_PTR(-ENOMEM));
454
455         rc = ldlm_prep_enqueue_req(exp, req, NULL, 0);
456         if (rc) {
457                 ptlrpc_request_free(req);
458                 RETURN(ERR_PTR(rc));
459         }
460
461         ptlrpc_request_set_replen(req);
462         RETURN(req);
463 }
464
465 static int mdc_finish_enqueue(struct obd_export *exp,
466                               struct ptlrpc_request *req,
467                               struct ldlm_enqueue_info *einfo,
468                               struct lookup_intent *it,
469                               struct lustre_handle *lockh,
470                               int rc)
471 {
472         struct req_capsule  *pill = &req->rq_pill;
473         struct ldlm_request *lockreq;
474         struct ldlm_reply   *lockrep;
475         ENTRY;
476
477         LASSERT(rc >= 0);
478         /* Similarly, if we're going to replay this request, we don't want to
479          * actually get a lock, just perform the intent. */
480         if (req->rq_transno || req->rq_replay) {
481                 lockreq = req_capsule_client_get(pill, &RMF_DLM_REQ);
482                 lockreq->lock_flags |= LDLM_FL_INTENT_ONLY;
483         }
484
485         if (rc == ELDLM_LOCK_ABORTED) {
486                 einfo->ei_mode = 0;
487                 memset(lockh, 0, sizeof(*lockh));
488                 rc = 0;
489         } else { /* rc = 0 */
490                 struct ldlm_lock *lock = ldlm_handle2lock(lockh);
491                 LASSERT(lock);
492
493                 /* If the server gave us back a different lock mode, we should
494                  * fix up our variables. */
495                 if (lock->l_req_mode != einfo->ei_mode) {
496                         ldlm_lock_addref(lockh, lock->l_req_mode);
497                         ldlm_lock_decref(lockh, einfo->ei_mode);
498                         einfo->ei_mode = lock->l_req_mode;
499                 }
500                 LDLM_LOCK_PUT(lock);
501         }
502
503         lockrep = req_capsule_server_get(pill, &RMF_DLM_REP);
504         LASSERT(lockrep != NULL);                 /* checked by ldlm_cli_enqueue() */
505
506         it->d.lustre.it_disposition = (int)lockrep->lock_policy_res1;
507         it->d.lustre.it_status = (int)lockrep->lock_policy_res2;
508         it->d.lustre.it_lock_mode = einfo->ei_mode;
509         it->d.lustre.it_lock_handle = lockh->cookie;
510         it->d.lustre.it_data = req;
511
512         if (it->d.lustre.it_status < 0 && req->rq_replay)
513                 mdc_clear_replay_flag(req, it->d.lustre.it_status);
514
515         /* If we're doing an IT_OPEN which did not result in an actual
516          * successful open, then we need to remove the bit which saves
517          * this request for unconditional replay.
518          *
519          * It's important that we do this first!  Otherwise we might exit the
520          * function without doing so, and try to replay a failed create
521          * (bug 3440) */
522         if (it->it_op & IT_OPEN && req->rq_replay &&
523             (!it_disposition(it, DISP_OPEN_OPEN) ||it->d.lustre.it_status != 0))
524                 mdc_clear_replay_flag(req, it->d.lustre.it_status);
525
526         DEBUG_REQ(D_RPCTRACE, req, "op: %d disposition: %x, status: %d",
527                   it->it_op,it->d.lustre.it_disposition,it->d.lustre.it_status);
528
529         /* We know what to expect, so we do any byte flipping required here */
530         if (it->it_op & (IT_OPEN | IT_UNLINK | IT_LOOKUP | IT_GETATTR)) {
531                 struct mdt_body *body;
532
533                 body = req_capsule_server_get(pill, &RMF_MDT_BODY);
534                 if (body == NULL) {
535                         CERROR ("Can't swab mdt_body\n");
536                         RETURN (-EPROTO);
537                 }
538
539                 if (it_disposition(it, DISP_OPEN_OPEN) &&
540                     !it_open_error(DISP_OPEN_OPEN, it)) {
541                         /*
542                          * If this is a successful OPEN request, we need to set
543                          * replay handler and data early, so that if replay
544                          * happens immediately after swabbing below, new reply
545                          * is swabbed by that handler correctly.
546                          */
547                         mdc_set_open_replay_data(NULL, NULL, req);
548                 }
549
550                 if ((body->valid & (OBD_MD_FLDIREA | OBD_MD_FLEASIZE)) != 0) {
551                         void *eadata;
552
553                          mdc_update_max_ea_from_body(exp, body);
554
555                         /*
556                          * The eadata is opaque; just check that it is there.
557                          * Eventually, obd_unpackmd() will check the contents.
558                          */
559                         eadata = req_capsule_server_sized_get(pill, &RMF_MDT_MD,
560                                                               body->eadatasize);
561                         if (eadata == NULL)
562                                 RETURN(-EPROTO);
563
564                         /*
565                          * We save the reply LOV EA in case we have to replay a
566                          * create for recovery.  If we didn't allocate a large
567                          * enough request buffer above we need to reallocate it
568                          * here to hold the actual LOV EA.
569                          *
570                          * To not save LOV EA if request is not going to replay
571                          * (for example error one).
572                          */
573                         if ((it->it_op & IT_OPEN) && req->rq_replay) {
574                                 void *lmm;
575                                 if (req_capsule_get_size(pill, &RMF_EADATA,
576                                                          RCL_CLIENT) <
577                                     body->eadatasize)
578                                         mdc_realloc_openmsg(req, body);
579                                 else
580                                         req_capsule_shrink(pill, &RMF_EADATA,
581                                                            body->eadatasize,
582                                                            RCL_CLIENT);
583
584                                 req_capsule_set_size(pill, &RMF_EADATA,
585                                                      RCL_CLIENT,
586                                                      body->eadatasize);
587
588                                 lmm = req_capsule_client_get(pill, &RMF_EADATA);
589                                 if (lmm)
590                                         memcpy(lmm, eadata, body->eadatasize);
591                         }
592                 }
593
594                 if (body->valid & OBD_MD_FLRMTPERM) {
595                         struct mdt_remote_perm *perm;
596
597                         LASSERT(client_is_remote(exp));
598                         perm = req_capsule_server_swab_get(pill, &RMF_ACL,
599                                                 lustre_swab_mdt_remote_perm);
600                         if (perm == NULL)
601                                 RETURN(-EPROTO);
602                 }
603                 if (body->valid & OBD_MD_FLMDSCAPA) {
604                         struct lustre_capa *capa, *p;
605
606                         capa = req_capsule_server_get(pill, &RMF_CAPA1);
607                         if (capa == NULL)
608                                 RETURN(-EPROTO);
609
610                         if (it->it_op & IT_OPEN) {
611                                 /* client fid capa will be checked in replay */
612                                 p = req_capsule_client_get(pill, &RMF_CAPA2);
613                                 LASSERT(p);
614                                 *p = *capa;
615                         }
616                 }
617                 if (body->valid & OBD_MD_FLOSSCAPA) {
618                         struct lustre_capa *capa;
619
620                         capa = req_capsule_server_get(pill, &RMF_CAPA2);
621                         if (capa == NULL)
622                                 RETURN(-EPROTO);
623                 }
624         }
625
626         RETURN(rc);
627 }
628
629 /* We always reserve enough space in the reply packet for a stripe MD, because
630  * we don't know in advance the file type. */
631 int mdc_enqueue(struct obd_export *exp, struct ldlm_enqueue_info *einfo,
632                 struct lookup_intent *it, struct md_op_data *op_data,
633                 struct lustre_handle *lockh, void *lmm, int lmmsize,
634                 struct ptlrpc_request **reqp, int extra_lock_flags)
635 {
636         struct obd_device     *obddev = class_exp2obd(exp);
637         struct ptlrpc_request *req = NULL;
638         int                    flags = extra_lock_flags;
639         int                    rc;
640         struct ldlm_res_id res_id;
641         static const ldlm_policy_data_t lookup_policy =
642                             { .l_inodebits = { MDS_INODELOCK_LOOKUP } };
643         static const ldlm_policy_data_t update_policy =
644                             { .l_inodebits = { MDS_INODELOCK_UPDATE } };
645         ldlm_policy_data_t const *policy = &lookup_policy;
646         ENTRY;
647
648         LASSERTF(!it || einfo->ei_type == LDLM_IBITS, "lock type %d\n",
649                  einfo->ei_type);
650
651         fid_build_reg_res_name(&op_data->op_fid1, &res_id);
652
653         if (it)
654                 flags |= LDLM_FL_HAS_INTENT;
655         if (it && it->it_op & (IT_UNLINK | IT_GETATTR | IT_READDIR))
656                 policy = &update_policy;
657
658         if (reqp)
659                 req = *reqp;
660
661         if (!it) {
662                 /* The only way right now is FLOCK, in this case we hide flock
663                    policy as lmm, but lmmsize is 0 */
664                 LASSERT(lmm && lmmsize == 0);
665                 LASSERTF(einfo->ei_type == LDLM_FLOCK, "lock type %d\n",
666                          einfo->ei_type);
667                 policy = (ldlm_policy_data_t *)lmm;
668                 res_id.name[3] = LDLM_FLOCK;
669         } else if (it->it_op & IT_OPEN) {
670                 req = mdc_intent_open_pack(exp, it, op_data, lmm, lmmsize,
671                                            einfo->ei_cbdata);
672                 policy = &update_policy;
673                 einfo->ei_cbdata = NULL;
674                 lmm = NULL;
675         } else if (it->it_op & IT_UNLINK)
676                 req = mdc_intent_unlink_pack(exp, it, op_data);
677         else if (it->it_op & (IT_GETATTR | IT_LOOKUP | IT_LAYOUT))
678                 req = mdc_intent_getattr_pack(exp, it, op_data);
679         else if (it->it_op == IT_READDIR)
680                 req = ldlm_enqueue_pack(exp);
681         else {
682                 LBUG();
683                 RETURN(-EINVAL);
684         }
685
686         if (IS_ERR(req))
687                 RETURN(PTR_ERR(req));
688
689         /* It is important to obtain rpc_lock first (if applicable), so that
690          * threads that are serialised with rpc_lock are not polluting our
691          * rpcs in flight counter. We do not do flock request limiting, though*/
692         if (it) {
693                 mdc_get_rpc_lock(obddev->u.cli.cl_rpc_lock, it);
694                 rc = mdc_enter_request(&obddev->u.cli);
695                 if (rc != 0) {
696                         mdc_put_rpc_lock(obddev->u.cli.cl_rpc_lock, it);
697                         mdc_clear_replay_flag(req, 0);
698                         ptlrpc_req_finished(req);
699                         RETURN(rc);
700                 }
701         }
702
703         rc = ldlm_cli_enqueue(exp, &req, einfo, &res_id, policy, &flags, NULL,
704                               0, lockh, 0);
705         if (reqp)
706                 *reqp = req;
707
708         if (it) {
709                 mdc_exit_request(&obddev->u.cli);
710                 mdc_put_rpc_lock(obddev->u.cli.cl_rpc_lock, it);
711         }
712         if (!it) {
713                 /* For flock requests we immediatelly return without further
714                    delay and let caller deal with the rest, since rest of
715                    this function metadata processing makes no sense for flock
716                    requests anyway */
717                 RETURN(rc);
718         }
719
720         if (rc < 0) {
721                 CERROR("ldlm_cli_enqueue: %d\n", rc);
722                 mdc_clear_replay_flag(req, rc);
723                 ptlrpc_req_finished(req);
724                 RETURN(rc);
725         }
726         rc = mdc_finish_enqueue(exp, req, einfo, it, lockh, rc);
727
728         RETURN(rc);
729 }
730
731 static int mdc_finish_intent_lock(struct obd_export *exp,
732                                   struct ptlrpc_request *request,
733                                   struct md_op_data *op_data,
734                                   struct lookup_intent *it,
735                                   struct lustre_handle *lockh)
736 {
737         struct lustre_handle old_lock;
738         struct mdt_body *mdt_body;
739         struct ldlm_lock *lock;
740         int rc;
741
742
743         LASSERT(request != NULL);
744         LASSERT(request != LP_POISON);
745         LASSERT(request->rq_repmsg != LP_POISON);
746
747         if (!it_disposition(it, DISP_IT_EXECD)) {
748                 /* The server failed before it even started executing the
749                  * intent, i.e. because it couldn't unpack the request. */
750                 LASSERT(it->d.lustre.it_status != 0);
751                 RETURN(it->d.lustre.it_status);
752         }
753         rc = it_open_error(DISP_IT_EXECD, it);
754         if (rc)
755                 RETURN(rc);
756
757         mdt_body = req_capsule_server_get(&request->rq_pill, &RMF_MDT_BODY);
758         LASSERT(mdt_body != NULL);      /* mdc_enqueue checked */
759
760         /* If we were revalidating a fid/name pair, mark the intent in
761          * case we fail and get called again from lookup */
762         if (fid_is_sane(&op_data->op_fid2) &&
763             it->it_create_mode & M_CHECK_STALE &&
764             it->it_op != IT_GETATTR) {
765                 it_set_disposition(it, DISP_ENQ_COMPLETE);
766
767                 /* Also: did we find the same inode? */
768                 /* sever can return one of two fids:
769                  * op_fid2 - new allocated fid - if file is created.
770                  * op_fid3 - existent fid - if file only open.
771                  * op_fid3 is saved in lmv_intent_open */
772                 if ((!lu_fid_eq(&op_data->op_fid2, &mdt_body->fid1)) &&
773                     (!lu_fid_eq(&op_data->op_fid3, &mdt_body->fid1))) {
774                         CDEBUG(D_DENTRY, "Found stale data "DFID"("DFID")/"DFID
775                                "\n", PFID(&op_data->op_fid2),
776                                PFID(&op_data->op_fid2), PFID(&mdt_body->fid1));
777                         RETURN(-ESTALE);
778                 }
779         }
780
781         rc = it_open_error(DISP_LOOKUP_EXECD, it);
782         if (rc)
783                 RETURN(rc);
784
785         /* keep requests around for the multiple phases of the call
786          * this shows the DISP_XX must guarantee we make it into the call
787          */
788         if (!it_disposition(it, DISP_ENQ_CREATE_REF) &&
789             it_disposition(it, DISP_OPEN_CREATE) &&
790             !it_open_error(DISP_OPEN_CREATE, it)) {
791                 it_set_disposition(it, DISP_ENQ_CREATE_REF);
792                 ptlrpc_request_addref(request); /* balanced in ll_create_node */
793         }
794         if (!it_disposition(it, DISP_ENQ_OPEN_REF) &&
795             it_disposition(it, DISP_OPEN_OPEN) &&
796             !it_open_error(DISP_OPEN_OPEN, it)) {
797                 it_set_disposition(it, DISP_ENQ_OPEN_REF);
798                 ptlrpc_request_addref(request); /* balanced in ll_file_open */
799                 /* BUG 11546 - eviction in the middle of open rpc processing */
800                 OBD_FAIL_TIMEOUT(OBD_FAIL_MDC_ENQUEUE_PAUSE, obd_timeout);
801         }
802
803         if (it->it_op & IT_CREAT) {
804                 /* XXX this belongs in ll_create_it */
805         } else if (it->it_op == IT_OPEN) {
806                 LASSERT(!it_disposition(it, DISP_OPEN_CREATE));
807         } else {
808                 LASSERT(it->it_op & (IT_GETATTR | IT_LOOKUP | IT_LAYOUT));
809         }
810
811         /* If we already have a matching lock, then cancel the new
812          * one.  We have to set the data here instead of in
813          * mdc_enqueue, because we need to use the child's inode as
814          * the l_ast_data to match, and that's not available until
815          * intent_finish has performed the iget().) */
816         lock = ldlm_handle2lock(lockh);
817         if (lock) {
818                 ldlm_policy_data_t policy = lock->l_policy_data;
819                 LDLM_DEBUG(lock, "matching against this");
820
821                 LASSERTF(fid_res_name_eq(&mdt_body->fid1,
822                                          &lock->l_resource->lr_name),
823                          "Lock res_id: %lu/%lu/%lu, fid: %lu/%lu/%lu.\n",
824                          (unsigned long)lock->l_resource->lr_name.name[0],
825                          (unsigned long)lock->l_resource->lr_name.name[1],
826                          (unsigned long)lock->l_resource->lr_name.name[2],
827                          (unsigned long)fid_seq(&mdt_body->fid1),
828                          (unsigned long)fid_oid(&mdt_body->fid1),
829                          (unsigned long)fid_ver(&mdt_body->fid1));
830                 LDLM_LOCK_PUT(lock);
831
832                 memcpy(&old_lock, lockh, sizeof(*lockh));
833                 if (ldlm_lock_match(NULL, LDLM_FL_BLOCK_GRANTED, NULL,
834                                     LDLM_IBITS, &policy, LCK_NL, &old_lock, 0)) {
835                         ldlm_lock_decref_and_cancel(lockh,
836                                                     it->d.lustre.it_lock_mode);
837                         memcpy(lockh, &old_lock, sizeof(old_lock));
838                         it->d.lustre.it_lock_handle = lockh->cookie;
839                 }
840         }
841         CDEBUG(D_DENTRY,"D_IT dentry %.*s intent: %s status %d disp %x rc %d\n",
842                op_data->op_namelen, op_data->op_name, ldlm_it2str(it->it_op),
843                it->d.lustre.it_status, it->d.lustre.it_disposition, rc);
844         RETURN(rc);
845 }
846
847 int mdc_revalidate_lock(struct obd_export *exp, struct lookup_intent *it,
848                         struct lu_fid *fid, __u64 *bits)
849 {
850         /* We could just return 1 immediately, but since we should only
851          * be called in revalidate_it if we already have a lock, let's
852          * verify that. */
853         struct ldlm_res_id res_id;
854         struct lustre_handle lockh;
855         ldlm_policy_data_t policy;
856         ldlm_mode_t mode;
857         ENTRY;
858
859         if (it->d.lustre.it_lock_handle) {
860                 lockh.cookie = it->d.lustre.it_lock_handle;
861                 mode = ldlm_revalidate_lock_handle(&lockh, bits);
862         } else {
863                 fid_build_reg_res_name(fid, &res_id);
864                 switch (it->it_op) {
865                 case IT_GETATTR:
866                         policy.l_inodebits.bits = MDS_INODELOCK_UPDATE;
867                         break;
868                 case IT_LAYOUT:
869                         policy.l_inodebits.bits = MDS_INODELOCK_LAYOUT;
870                         break;
871                 default:
872                         policy.l_inodebits.bits = MDS_INODELOCK_LOOKUP;
873                         break;
874                 }
875                 mode = ldlm_lock_match(exp->exp_obd->obd_namespace,
876                                        LDLM_FL_BLOCK_GRANTED, &res_id,
877                                        LDLM_IBITS, &policy,
878                                        LCK_CR|LCK_CW|LCK_PR|LCK_PW, &lockh, 0);
879         }
880
881         if (mode) {
882                 it->d.lustre.it_lock_handle = lockh.cookie;
883                 it->d.lustre.it_lock_mode = mode;
884         } else {
885                 it->d.lustre.it_lock_handle = 0;
886                 it->d.lustre.it_lock_mode = 0;
887         }
888
889         RETURN(!!mode);
890 }
891
892 /*
893  * This long block is all about fixing up the lock and request state
894  * so that it is correct as of the moment _before_ the operation was
895  * applied; that way, the VFS will think that everything is normal and
896  * call Lustre's regular VFS methods.
897  *
898  * If we're performing a creation, that means that unless the creation
899  * failed with EEXIST, we should fake up a negative dentry.
900  *
901  * For everything else, we want to lookup to succeed.
902  *
903  * One additional note: if CREATE or OPEN succeeded, we add an extra
904  * reference to the request because we need to keep it around until
905  * ll_create/ll_open gets called.
906  *
907  * The server will return to us, in it_disposition, an indication of
908  * exactly what d.lustre.it_status refers to.
909  *
910  * If DISP_OPEN_OPEN is set, then d.lustre.it_status refers to the open() call,
911  * otherwise if DISP_OPEN_CREATE is set, then it status is the
912  * creation failure mode.  In either case, one of DISP_LOOKUP_NEG or
913  * DISP_LOOKUP_POS will be set, indicating whether the child lookup
914  * was successful.
915  *
916  * Else, if DISP_LOOKUP_EXECD then d.lustre.it_status is the rc of the
917  * child lookup.
918  */
919 int mdc_intent_lock(struct obd_export *exp, struct md_op_data *op_data,
920                     void *lmm, int lmmsize, struct lookup_intent *it,
921                     int lookup_flags, struct ptlrpc_request **reqp,
922                     ldlm_blocking_callback cb_blocking,
923                     int extra_lock_flags)
924 {
925         struct lustre_handle lockh;
926         int rc = 0;
927         ENTRY;
928         LASSERT(it);
929
930         CDEBUG(D_DLMTRACE, "(name: %.*s,"DFID") in obj "DFID
931                ", intent: %s flags %#o\n", op_data->op_namelen,
932                op_data->op_name, PFID(&op_data->op_fid2),
933                PFID(&op_data->op_fid1), ldlm_it2str(it->it_op),
934                it->it_flags);
935
936         lockh.cookie = 0;
937         if (fid_is_sane(&op_data->op_fid2) &&
938             (it->it_op & (IT_LOOKUP | IT_GETATTR | IT_LAYOUT))) {
939                 /* We could just return 1 immediately, but since we should only
940                  * be called in revalidate_it if we already have a lock, let's
941                  * verify that. */
942                 it->d.lustre.it_lock_handle = 0;
943                 rc = mdc_revalidate_lock(exp, it, &op_data->op_fid2, NULL);
944                 /* Only return failure if it was not GETATTR by cfid
945                    (from inode_revalidate) */
946                 if (rc || op_data->op_namelen != 0)
947                         RETURN(rc);
948         }
949
950         /* lookup_it may be called only after revalidate_it has run, because
951          * revalidate_it cannot return errors, only zero.  Returning zero causes
952          * this call to lookup, which *can* return an error.
953          *
954          * We only want to execute the request associated with the intent one
955          * time, however, so don't send the request again.  Instead, skip past
956          * this and use the request from revalidate.  In this case, revalidate
957          * never dropped its reference, so the refcounts are all OK */
958         if (!it_disposition(it, DISP_ENQ_COMPLETE)) {
959                 struct ldlm_enqueue_info einfo =
960                         { LDLM_IBITS, it_to_lock_mode(it), cb_blocking,
961                           ldlm_completion_ast, NULL, NULL, NULL };
962
963                 /* For case if upper layer did not alloc fid, do it now. */
964                 if (!fid_is_sane(&op_data->op_fid2) && it->it_op & IT_CREAT) {
965                         rc = mdc_fid_alloc(exp, &op_data->op_fid2, op_data);
966                         if (rc < 0) {
967                                 CERROR("Can't alloc new fid, rc %d\n", rc);
968                                 RETURN(rc);
969                         }
970                 }
971                 rc = mdc_enqueue(exp, &einfo, it, op_data, &lockh,
972                                  lmm, lmmsize, NULL, extra_lock_flags);
973                 if (rc < 0)
974                         RETURN(rc);
975         } else if (!fid_is_sane(&op_data->op_fid2) ||
976                    !(it->it_create_mode & M_CHECK_STALE)) {
977                 /* DISP_ENQ_COMPLETE set means there is extra reference on
978                  * request referenced from this intent, saved for subsequent
979                  * lookup.  This path is executed when we proceed to this
980                  * lookup, so we clear DISP_ENQ_COMPLETE */
981                 it_clear_disposition(it, DISP_ENQ_COMPLETE);
982         }
983         *reqp = it->d.lustre.it_data;
984         rc = mdc_finish_intent_lock(exp, *reqp, op_data, it, &lockh);
985         RETURN(rc);
986 }
987
988 static int mdc_intent_getattr_async_interpret(const struct lu_env *env,
989                                               struct ptlrpc_request *req,
990                                               void *args, int rc)
991 {
992         struct mdc_getattr_args  *ga = args;
993         struct obd_export        *exp = ga->ga_exp;
994         struct md_enqueue_info   *minfo = ga->ga_minfo;
995         struct ldlm_enqueue_info *einfo = ga->ga_einfo;
996         struct lookup_intent     *it;
997         struct lustre_handle     *lockh;
998         struct obd_device        *obddev;
999         int                       flags = LDLM_FL_HAS_INTENT;
1000         ENTRY;
1001
1002         it    = &minfo->mi_it;
1003         lockh = &minfo->mi_lockh;
1004
1005         obddev = class_exp2obd(exp);
1006
1007         mdc_exit_request(&obddev->u.cli);
1008         if (OBD_FAIL_CHECK(OBD_FAIL_MDC_GETATTR_ENQUEUE))
1009                 rc = -ETIMEDOUT;
1010
1011         rc = ldlm_cli_enqueue_fini(exp, req, einfo->ei_type, 1, einfo->ei_mode,
1012                                    &flags, NULL, 0, lockh, rc);
1013         if (rc < 0) {
1014                 CERROR("ldlm_cli_enqueue_fini: %d\n", rc);
1015                 mdc_clear_replay_flag(req, rc);
1016                 GOTO(out, rc);
1017         }
1018
1019         rc = mdc_finish_enqueue(exp, req, einfo, it, lockh, rc);
1020         if (rc)
1021                 GOTO(out, rc);
1022
1023         rc = mdc_finish_intent_lock(exp, req, &minfo->mi_data, it, lockh);
1024         EXIT;
1025
1026 out:
1027         OBD_FREE_PTR(einfo);
1028         minfo->mi_cb(req, minfo, rc);
1029         return 0;
1030 }
1031
1032 int mdc_intent_getattr_async(struct obd_export *exp,
1033                              struct md_enqueue_info *minfo,
1034                              struct ldlm_enqueue_info *einfo)
1035 {
1036         struct md_op_data       *op_data = &minfo->mi_data;
1037         struct lookup_intent    *it = &minfo->mi_it;
1038         struct ptlrpc_request   *req;
1039         struct mdc_getattr_args *ga;
1040         struct obd_device       *obddev = class_exp2obd(exp);
1041         struct ldlm_res_id       res_id;
1042         /*XXX: Both MDS_INODELOCK_LOOKUP and MDS_INODELOCK_UPDATE are needed
1043          *     for statahead currently. Consider CMD in future, such two bits
1044          *     maybe managed by different MDS, should be adjusted then. */
1045         ldlm_policy_data_t       policy = {
1046                                         .l_inodebits = { MDS_INODELOCK_LOOKUP | 
1047                                                          MDS_INODELOCK_UPDATE }
1048                                  };
1049         int                      rc = 0;
1050         int                      flags = LDLM_FL_HAS_INTENT;
1051         ENTRY;
1052
1053         CDEBUG(D_DLMTRACE,"name: %.*s in inode "DFID", intent: %s flags %#o\n",
1054                op_data->op_namelen, op_data->op_name, PFID(&op_data->op_fid1),
1055                ldlm_it2str(it->it_op), it->it_flags);
1056
1057         fid_build_reg_res_name(&op_data->op_fid1, &res_id);
1058         req = mdc_intent_getattr_pack(exp, it, op_data);
1059         if (!req)
1060                 RETURN(-ENOMEM);
1061
1062         rc = mdc_enter_request(&obddev->u.cli);
1063         if (rc != 0) {
1064                 ptlrpc_req_finished(req);
1065                 RETURN(rc);
1066         }
1067
1068         rc = ldlm_cli_enqueue(exp, &req, einfo, &res_id, &policy, &flags, NULL,
1069                               0, &minfo->mi_lockh, 1);
1070         if (rc < 0) {
1071                 mdc_exit_request(&obddev->u.cli);
1072                 ptlrpc_req_finished(req);
1073                 RETURN(rc);
1074         }
1075
1076         CLASSERT(sizeof(*ga) <= sizeof(req->rq_async_args));
1077         ga = ptlrpc_req_async_args(req);
1078         ga->ga_exp = exp;
1079         ga->ga_minfo = minfo;
1080         ga->ga_einfo = einfo;
1081
1082         req->rq_interpret_reply = mdc_intent_getattr_async_interpret;
1083         ptlrpcd_add_req(req, PDL_POLICY_LOCAL, -1);
1084
1085         RETURN(0);
1086 }