Whamcloud - gitweb
LU-1346 libcfs: replace libcfs wrappers with kernel API
[fs/lustre-release.git] / lustre / mdc / mdc_locks.c
1 /*
2  * GPL HEADER START
3  *
4  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
5  *
6  * This program is free software; you can redistribute it and/or modify
7  * it under the terms of the GNU General Public License version 2 only,
8  * as published by the Free Software Foundation.
9  *
10  * This program is distributed in the hope that it will be useful, but
11  * WITHOUT ANY WARRANTY; without even the implied warranty of
12  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13  * General Public License version 2 for more details (a copy is included
14  * in the LICENSE file that accompanied this code).
15  *
16  * You should have received a copy of the GNU General Public License
17  * version 2 along with this program; If not, see
18  * http://www.sun.com/software/products/lustre/docs/GPLv2.pdf
19  *
20  * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara,
21  * CA 95054 USA or visit www.sun.com if you need additional information or
22  * have any questions.
23  *
24  * GPL HEADER END
25  */
26 /*
27  * Copyright (c) 2003, 2010, Oracle and/or its affiliates. All rights reserved.
28  * Use is subject to license terms.
29  *
30  * Copyright (c) 2011, 2012, Whamcloud, Inc.
31  */
32 /*
33  * This file is part of Lustre, http://www.lustre.org/
34  * Lustre is a trademark of Sun Microsystems, Inc.
35  */
36
37 #define DEBUG_SUBSYSTEM S_MDC
38
39 #ifdef __KERNEL__
40 # include <linux/module.h>
41 # include <linux/pagemap.h>
42 # include <linux/miscdevice.h>
43 # include <linux/init.h>
44 #else
45 # include <liblustre.h>
46 #endif
47
48 #include <lustre_acl.h>
49 #include <obd_class.h>
50 #include <lustre_dlm.h>
51 /* fid_res_name_eq() */
52 #include <lustre_fid.h>
53 #include <lprocfs_status.h>
54 #include "mdc_internal.h"
55
56 struct mdc_getattr_args {
57         struct obd_export           *ga_exp;
58         struct md_enqueue_info      *ga_minfo;
59         struct ldlm_enqueue_info    *ga_einfo;
60 };
61
62 int it_disposition(struct lookup_intent *it, int flag)
63 {
64         return it->d.lustre.it_disposition & flag;
65 }
66 EXPORT_SYMBOL(it_disposition);
67
68 void it_set_disposition(struct lookup_intent *it, int flag)
69 {
70         it->d.lustre.it_disposition |= flag;
71 }
72 EXPORT_SYMBOL(it_set_disposition);
73
74 void it_clear_disposition(struct lookup_intent *it, int flag)
75 {
76         it->d.lustre.it_disposition &= ~flag;
77 }
78 EXPORT_SYMBOL(it_clear_disposition);
79
80 int it_open_error(int phase, struct lookup_intent *it)
81 {
82         if (it_disposition(it, DISP_OPEN_OPEN)) {
83                 if (phase >= DISP_OPEN_OPEN)
84                         return it->d.lustre.it_status;
85                 else
86                         return 0;
87         }
88
89         if (it_disposition(it, DISP_OPEN_CREATE)) {
90                 if (phase >= DISP_OPEN_CREATE)
91                         return it->d.lustre.it_status;
92                 else
93                         return 0;
94         }
95
96         if (it_disposition(it, DISP_LOOKUP_EXECD)) {
97                 if (phase >= DISP_LOOKUP_EXECD)
98                         return it->d.lustre.it_status;
99                 else
100                         return 0;
101         }
102
103         if (it_disposition(it, DISP_IT_EXECD)) {
104                 if (phase >= DISP_IT_EXECD)
105                         return it->d.lustre.it_status;
106                 else
107                         return 0;
108         }
109         CERROR("it disp: %X, status: %d\n", it->d.lustre.it_disposition,
110                it->d.lustre.it_status);
111         LBUG();
112         return 0;
113 }
114 EXPORT_SYMBOL(it_open_error);
115
116 /* this must be called on a lockh that is known to have a referenced lock */
117 int mdc_set_lock_data(struct obd_export *exp, __u64 *lockh, void *data,
118                       __u64 *bits)
119 {
120         struct ldlm_lock *lock;
121         ENTRY;
122
123         if(bits)
124                 *bits = 0;
125
126         if (!*lockh)
127                 RETURN(0);
128
129         lock = ldlm_handle2lock((struct lustre_handle *)lockh);
130
131         LASSERT(lock != NULL);
132         lock_res_and_lock(lock);
133 #ifdef __KERNEL__
134         if (lock->l_ast_data && lock->l_ast_data != data) {
135                 struct inode *new_inode = data;
136                 struct inode *old_inode = lock->l_ast_data;
137                 LASSERTF(old_inode->i_state & I_FREEING,
138                          "Found existing inode %p/%lu/%u state %lu in lock: "
139                          "setting data to %p/%lu/%u\n", old_inode,
140                          old_inode->i_ino, old_inode->i_generation,
141                          old_inode->i_state,
142                          new_inode, new_inode->i_ino, new_inode->i_generation);
143         }
144 #endif
145         lock->l_ast_data = data;
146         if (bits)
147                 *bits = lock->l_policy_data.l_inodebits.bits;
148
149         unlock_res_and_lock(lock);
150         LDLM_LOCK_PUT(lock);
151
152         RETURN(0);
153 }
154
155 ldlm_mode_t mdc_lock_match(struct obd_export *exp, __u64 flags,
156                            const struct lu_fid *fid, ldlm_type_t type,
157                            ldlm_policy_data_t *policy, ldlm_mode_t mode,
158                            struct lustre_handle *lockh)
159 {
160         struct ldlm_res_id res_id;
161         ldlm_mode_t rc;
162         ENTRY;
163
164         fid_build_reg_res_name(fid, &res_id);
165         rc = ldlm_lock_match(class_exp2obd(exp)->obd_namespace, flags,
166                              &res_id, type, policy, mode, lockh, 0);
167         RETURN(rc);
168 }
169
170 int mdc_cancel_unused(struct obd_export *exp,
171                       const struct lu_fid *fid,
172                       ldlm_policy_data_t *policy,
173                       ldlm_mode_t mode,
174                       ldlm_cancel_flags_t flags,
175                       void *opaque)
176 {
177         struct ldlm_res_id res_id;
178         struct obd_device *obd = class_exp2obd(exp);
179         int rc;
180
181         ENTRY;
182
183         fid_build_reg_res_name(fid, &res_id);
184         rc = ldlm_cli_cancel_unused_resource(obd->obd_namespace, &res_id,
185                                              policy, mode, flags, opaque);
186         RETURN(rc);
187 }
188
189 int mdc_change_cbdata(struct obd_export *exp,
190                       const struct lu_fid *fid,
191                       ldlm_iterator_t it, void *data)
192 {
193         struct ldlm_res_id res_id;
194         ENTRY;
195
196         fid_build_reg_res_name(fid, &res_id);
197         ldlm_resource_iterate(class_exp2obd(exp)->obd_namespace,
198                               &res_id, it, data);
199
200         EXIT;
201         return 0;
202 }
203
204 /* find any ldlm lock of the inode in mdc
205  * return 0    not find
206  *        1    find one
207  *      < 0    error */
208 int mdc_find_cbdata(struct obd_export *exp,
209                     const struct lu_fid *fid,
210                     ldlm_iterator_t it, void *data)
211 {
212         struct ldlm_res_id res_id;
213         int rc = 0;
214         ENTRY;
215
216         fid_build_reg_res_name((struct lu_fid*)fid, &res_id);
217         rc = ldlm_resource_iterate(class_exp2obd(exp)->obd_namespace, &res_id,
218                                    it, data);
219         if (rc == LDLM_ITER_STOP)
220                 RETURN(1);
221         else if (rc == LDLM_ITER_CONTINUE)
222                 RETURN(0);
223         RETURN(rc);
224 }
225
226 static inline void mdc_clear_replay_flag(struct ptlrpc_request *req, int rc)
227 {
228         /* Don't hold error requests for replay. */
229         if (req->rq_replay) {
230                 spin_lock(&req->rq_lock);
231                 req->rq_replay = 0;
232                 spin_unlock(&req->rq_lock);
233         }
234         if (rc && req->rq_transno != 0) {
235                 DEBUG_REQ(D_ERROR, req, "transno returned on error rc %d", rc);
236                 LBUG();
237         }
238 }
239
240 /* Save a large LOV EA into the request buffer so that it is available
241  * for replay.  We don't do this in the initial request because the
242  * original request doesn't need this buffer (at most it sends just the
243  * lov_mds_md) and it is a waste of RAM/bandwidth to send the empty
244  * buffer and may also be difficult to allocate and save a very large
245  * request buffer for each open. (bug 5707)
246  *
247  * OOM here may cause recovery failure if lmm is needed (only for the
248  * original open if the MDS crashed just when this client also OOM'd)
249  * but this is incredibly unlikely, and questionable whether the client
250  * could do MDS recovery under OOM anyways... */
251 static void mdc_realloc_openmsg(struct ptlrpc_request *req,
252                                 struct mdt_body *body)
253 {
254         int     rc;
255
256         /* FIXME: remove this explicit offset. */
257         rc = sptlrpc_cli_enlarge_reqbuf(req, DLM_INTENT_REC_OFF + 4,
258                                         body->eadatasize);
259         if (rc) {
260                 CERROR("Can't enlarge segment %d size to %d\n",
261                        DLM_INTENT_REC_OFF + 4, body->eadatasize);
262                 body->valid &= ~OBD_MD_FLEASIZE;
263                 body->eadatasize = 0;
264         }
265 }
266
267 static struct ptlrpc_request *mdc_intent_open_pack(struct obd_export *exp,
268                                                    struct lookup_intent *it,
269                                                    struct md_op_data *op_data,
270                                                    void *lmm, int lmmsize,
271                                                    void *cb_data)
272 {
273         struct ptlrpc_request *req;
274         struct obd_device     *obddev = class_exp2obd(exp);
275         struct ldlm_intent    *lit;
276         CFS_LIST_HEAD(cancels);
277         int                    count = 0;
278         int                    mode;
279         int                    rc;
280         ENTRY;
281
282         it->it_create_mode = (it->it_create_mode & ~S_IFMT) | S_IFREG;
283
284         /* XXX: openlock is not cancelled for cross-refs. */
285         /* If inode is known, cancel conflicting OPEN locks. */
286         if (fid_is_sane(&op_data->op_fid2)) {
287                 if (it->it_flags & (FMODE_WRITE|MDS_OPEN_TRUNC))
288                         mode = LCK_CW;
289 #ifdef FMODE_EXEC
290                 else if (it->it_flags & FMODE_EXEC)
291                         mode = LCK_PR;
292 #endif
293                 else
294                         mode = LCK_CR;
295                 count = mdc_resource_get_unused(exp, &op_data->op_fid2,
296                                                 &cancels, mode,
297                                                 MDS_INODELOCK_OPEN);
298         }
299
300         /* If CREATE, cancel parent's UPDATE lock. */
301         if (it->it_op & IT_CREAT)
302                 mode = LCK_EX;
303         else
304                 mode = LCK_CR;
305         count += mdc_resource_get_unused(exp, &op_data->op_fid1,
306                                          &cancels, mode,
307                                          MDS_INODELOCK_UPDATE);
308
309         req = ptlrpc_request_alloc(class_exp2cliimp(exp),
310                                    &RQF_LDLM_INTENT_OPEN);
311         if (req == NULL) {
312                 ldlm_lock_list_put(&cancels, l_bl_ast, count);
313                 RETURN(ERR_PTR(-ENOMEM));
314         }
315
316         /* parent capability */
317         mdc_set_capa_size(req, &RMF_CAPA1, op_data->op_capa1);
318         /* child capability, reserve the size according to parent capa, it will
319          * be filled after we get the reply */
320         mdc_set_capa_size(req, &RMF_CAPA2, op_data->op_capa1);
321
322         req_capsule_set_size(&req->rq_pill, &RMF_NAME, RCL_CLIENT,
323                              op_data->op_namelen + 1);
324         req_capsule_set_size(&req->rq_pill, &RMF_EADATA, RCL_CLIENT,
325                              max(lmmsize, obddev->u.cli.cl_default_mds_easize));
326
327         rc = ldlm_prep_enqueue_req(exp, req, &cancels, count);
328         if (rc) {
329                 ptlrpc_request_free(req);
330                 return NULL;
331         }
332
333         spin_lock(&req->rq_lock);
334         req->rq_replay = req->rq_import->imp_replayable;
335         spin_unlock(&req->rq_lock);
336
337         /* pack the intent */
338         lit = req_capsule_client_get(&req->rq_pill, &RMF_LDLM_INTENT);
339         lit->opc = (__u64)it->it_op;
340
341         /* pack the intended request */
342         mdc_open_pack(req, op_data, it->it_create_mode, 0, it->it_flags, lmm,
343                       lmmsize);
344
345         /* for remote client, fetch remote perm for current user */
346         if (client_is_remote(exp))
347                 req_capsule_set_size(&req->rq_pill, &RMF_ACL, RCL_SERVER,
348                                      sizeof(struct mdt_remote_perm));
349         ptlrpc_request_set_replen(req);
350         return req;
351 }
352
353 static struct ptlrpc_request *mdc_intent_unlink_pack(struct obd_export *exp,
354                                                      struct lookup_intent *it,
355                                                      struct md_op_data *op_data)
356 {
357         struct ptlrpc_request *req;
358         struct obd_device     *obddev = class_exp2obd(exp);
359         struct ldlm_intent    *lit;
360         int                    rc;
361         ENTRY;
362
363         req = ptlrpc_request_alloc(class_exp2cliimp(exp),
364                                    &RQF_LDLM_INTENT_UNLINK);
365         if (req == NULL)
366                 RETURN(ERR_PTR(-ENOMEM));
367
368         mdc_set_capa_size(req, &RMF_CAPA1, op_data->op_capa1);
369         req_capsule_set_size(&req->rq_pill, &RMF_NAME, RCL_CLIENT,
370                              op_data->op_namelen + 1);
371
372         rc = ldlm_prep_enqueue_req(exp, req, NULL, 0);
373         if (rc) {
374                 ptlrpc_request_free(req);
375                 RETURN(ERR_PTR(rc));
376         }
377
378         /* pack the intent */
379         lit = req_capsule_client_get(&req->rq_pill, &RMF_LDLM_INTENT);
380         lit->opc = (__u64)it->it_op;
381
382         /* pack the intended request */
383         mdc_unlink_pack(req, op_data);
384
385         req_capsule_set_size(&req->rq_pill, &RMF_MDT_MD, RCL_SERVER,
386                              obddev->u.cli.cl_max_mds_easize);
387         req_capsule_set_size(&req->rq_pill, &RMF_ACL, RCL_SERVER,
388                              obddev->u.cli.cl_max_mds_cookiesize);
389         ptlrpc_request_set_replen(req);
390         RETURN(req);
391 }
392
393 static struct ptlrpc_request *mdc_intent_getattr_pack(struct obd_export *exp,
394                                                       struct lookup_intent *it,
395                                                       struct md_op_data *op_data)
396 {
397         struct ptlrpc_request *req;
398         struct obd_device     *obddev = class_exp2obd(exp);
399         obd_valid              valid = OBD_MD_FLGETATTR | OBD_MD_FLEASIZE |
400                                        OBD_MD_FLMODEASIZE | OBD_MD_FLDIREA |
401                                        OBD_MD_FLMDSCAPA | OBD_MD_MEA |
402                                        (client_is_remote(exp) ?
403                                                OBD_MD_FLRMTPERM : OBD_MD_FLACL);
404         struct ldlm_intent    *lit;
405         int                    rc;
406         ENTRY;
407
408         req = ptlrpc_request_alloc(class_exp2cliimp(exp),
409                                    &RQF_LDLM_INTENT_GETATTR);
410         if (req == NULL)
411                 RETURN(ERR_PTR(-ENOMEM));
412
413         mdc_set_capa_size(req, &RMF_CAPA1, op_data->op_capa1);
414         req_capsule_set_size(&req->rq_pill, &RMF_NAME, RCL_CLIENT,
415                              op_data->op_namelen + 1);
416
417         rc = ldlm_prep_enqueue_req(exp, req, NULL, 0);
418         if (rc) {
419                 ptlrpc_request_free(req);
420                 RETURN(ERR_PTR(rc));
421         }
422
423         /* pack the intent */
424         lit = req_capsule_client_get(&req->rq_pill, &RMF_LDLM_INTENT);
425         lit->opc = (__u64)it->it_op;
426
427         /* pack the intended request */
428         mdc_getattr_pack(req, valid, it->it_flags, op_data,
429                          obddev->u.cli.cl_max_mds_easize);
430
431         req_capsule_set_size(&req->rq_pill, &RMF_MDT_MD, RCL_SERVER,
432                              obddev->u.cli.cl_max_mds_easize);
433         if (client_is_remote(exp))
434                 req_capsule_set_size(&req->rq_pill, &RMF_ACL, RCL_SERVER,
435                                      sizeof(struct mdt_remote_perm));
436         ptlrpc_request_set_replen(req);
437         RETURN(req);
438 }
439
440 static struct ptlrpc_request *ldlm_enqueue_pack(struct obd_export *exp)
441 {
442         struct ptlrpc_request *req;
443         int rc;
444         ENTRY;
445
446         req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_LDLM_ENQUEUE);
447         if (req == NULL)
448                 RETURN(ERR_PTR(-ENOMEM));
449
450         rc = ldlm_prep_enqueue_req(exp, req, NULL, 0);
451         if (rc) {
452                 ptlrpc_request_free(req);
453                 RETURN(ERR_PTR(rc));
454         }
455
456         ptlrpc_request_set_replen(req);
457         RETURN(req);
458 }
459
460 static int mdc_finish_enqueue(struct obd_export *exp,
461                               struct ptlrpc_request *req,
462                               struct ldlm_enqueue_info *einfo,
463                               struct lookup_intent *it,
464                               struct lustre_handle *lockh,
465                               int rc)
466 {
467         struct req_capsule  *pill = &req->rq_pill;
468         struct ldlm_request *lockreq;
469         struct ldlm_reply   *lockrep;
470         struct lustre_intent_data *intent = &it->d.lustre;
471         ENTRY;
472
473         LASSERT(rc >= 0);
474         /* Similarly, if we're going to replay this request, we don't want to
475          * actually get a lock, just perform the intent. */
476         if (req->rq_transno || req->rq_replay) {
477                 lockreq = req_capsule_client_get(pill, &RMF_DLM_REQ);
478                 lockreq->lock_flags |= ldlm_flags_to_wire(LDLM_FL_INTENT_ONLY);
479         }
480
481         if (rc == ELDLM_LOCK_ABORTED) {
482                 einfo->ei_mode = 0;
483                 memset(lockh, 0, sizeof(*lockh));
484                 rc = 0;
485         } else { /* rc = 0 */
486                 struct ldlm_lock *lock = ldlm_handle2lock(lockh);
487                 LASSERT(lock);
488
489                 /* If the server gave us back a different lock mode, we should
490                  * fix up our variables. */
491                 if (lock->l_req_mode != einfo->ei_mode) {
492                         ldlm_lock_addref(lockh, lock->l_req_mode);
493                         ldlm_lock_decref(lockh, einfo->ei_mode);
494                         einfo->ei_mode = lock->l_req_mode;
495                 }
496                 LDLM_LOCK_PUT(lock);
497         }
498
499         lockrep = req_capsule_server_get(pill, &RMF_DLM_REP);
500         LASSERT(lockrep != NULL); /* checked by ldlm_cli_enqueue() */
501
502         intent->it_disposition = (int)lockrep->lock_policy_res1;
503         intent->it_status = (int)lockrep->lock_policy_res2;
504         intent->it_lock_mode = einfo->ei_mode;
505         intent->it_lock_handle = lockh->cookie;
506         intent->it_data = req;
507
508         /* Technically speaking rq_transno must already be zero if
509          * it_status is in error, so the check is a bit redundant */
510         if ((!req->rq_transno || intent->it_status < 0) && req->rq_replay)
511                 mdc_clear_replay_flag(req, intent->it_status);
512
513         /* If we're doing an IT_OPEN which did not result in an actual
514          * successful open, then we need to remove the bit which saves
515          * this request for unconditional replay.
516          *
517          * It's important that we do this first!  Otherwise we might exit the
518          * function without doing so, and try to replay a failed create
519          * (bug 3440) */
520         if (it->it_op & IT_OPEN && req->rq_replay &&
521             (!it_disposition(it, DISP_OPEN_OPEN) ||intent->it_status != 0))
522                 mdc_clear_replay_flag(req, intent->it_status);
523
524         DEBUG_REQ(D_RPCTRACE, req, "op: %d disposition: %x, status: %d",
525                   it->it_op, intent->it_disposition, intent->it_status);
526
527         /* We know what to expect, so we do any byte flipping required here */
528         if (it->it_op & (IT_OPEN | IT_UNLINK | IT_LOOKUP | IT_GETATTR)) {
529                 struct mdt_body *body;
530
531                 body = req_capsule_server_get(pill, &RMF_MDT_BODY);
532                 if (body == NULL) {
533                         CERROR ("Can't swab mdt_body\n");
534                         RETURN (-EPROTO);
535                 }
536
537                 if (it_disposition(it, DISP_OPEN_OPEN) &&
538                     !it_open_error(DISP_OPEN_OPEN, it)) {
539                         /*
540                          * If this is a successful OPEN request, we need to set
541                          * replay handler and data early, so that if replay
542                          * happens immediately after swabbing below, new reply
543                          * is swabbed by that handler correctly.
544                          */
545                         mdc_set_open_replay_data(NULL, NULL, req);
546                 }
547
548                 /* TODO: make sure LAYOUT lock must be granted along with EA */
549
550                 if ((body->valid & (OBD_MD_FLDIREA | OBD_MD_FLEASIZE)) != 0) {
551                         void *eadata;
552
553                          mdc_update_max_ea_from_body(exp, body);
554
555                         /*
556                          * The eadata is opaque; just check that it is there.
557                          * Eventually, obd_unpackmd() will check the contents.
558                          */
559                         eadata = req_capsule_server_sized_get(pill, &RMF_MDT_MD,
560                                                               body->eadatasize);
561                         if (eadata == NULL)
562                                 RETURN(-EPROTO);
563
564                         /*
565                          * We save the reply LOV EA in case we have to replay a
566                          * create for recovery.  If we didn't allocate a large
567                          * enough request buffer above we need to reallocate it
568                          * here to hold the actual LOV EA.
569                          *
570                          * To not save LOV EA if request is not going to replay
571                          * (for example error one).
572                          */
573                         if ((it->it_op & IT_OPEN) && req->rq_replay) {
574                                 void *lmm;
575                                 if (req_capsule_get_size(pill, &RMF_EADATA,
576                                                          RCL_CLIENT) <
577                                     body->eadatasize)
578                                         mdc_realloc_openmsg(req, body);
579                                 else
580                                         req_capsule_shrink(pill, &RMF_EADATA,
581                                                            body->eadatasize,
582                                                            RCL_CLIENT);
583
584                                 req_capsule_set_size(pill, &RMF_EADATA,
585                                                      RCL_CLIENT,
586                                                      body->eadatasize);
587
588                                 lmm = req_capsule_client_get(pill, &RMF_EADATA);
589                                 if (lmm)
590                                         memcpy(lmm, eadata, body->eadatasize);
591                         }
592                 }
593
594                 if (body->valid & OBD_MD_FLRMTPERM) {
595                         struct mdt_remote_perm *perm;
596
597                         LASSERT(client_is_remote(exp));
598                         perm = req_capsule_server_swab_get(pill, &RMF_ACL,
599                                                 lustre_swab_mdt_remote_perm);
600                         if (perm == NULL)
601                                 RETURN(-EPROTO);
602                 }
603                 if (body->valid & OBD_MD_FLMDSCAPA) {
604                         struct lustre_capa *capa, *p;
605
606                         capa = req_capsule_server_get(pill, &RMF_CAPA1);
607                         if (capa == NULL)
608                                 RETURN(-EPROTO);
609
610                         if (it->it_op & IT_OPEN) {
611                                 /* client fid capa will be checked in replay */
612                                 p = req_capsule_client_get(pill, &RMF_CAPA2);
613                                 LASSERT(p);
614                                 *p = *capa;
615                         }
616                 }
617                 if (body->valid & OBD_MD_FLOSSCAPA) {
618                         struct lustre_capa *capa;
619
620                         capa = req_capsule_server_get(pill, &RMF_CAPA2);
621                         if (capa == NULL)
622                                 RETURN(-EPROTO);
623                 }
624         } else if (it->it_op & IT_LAYOUT) {
625                 struct ldlm_lock *lock = ldlm_handle2lock(lockh);
626
627                 if (lock != NULL && lock->l_lvb_data == NULL) {
628                         int lvb_len;
629
630                         /* maybe the lock was granted right away and layout
631                          * is packed into RMF_DLM_LVB of req */
632                         lvb_len = req_capsule_get_size(pill, &RMF_DLM_LVB,
633                                                        RCL_SERVER);
634                         if (lvb_len > 0) {
635                                 void *lvb;
636                                 void *lmm;
637
638                                 lvb = req_capsule_server_get(pill,
639                                                              &RMF_DLM_LVB);
640                                 if (lvb == NULL) {
641                                         LDLM_LOCK_PUT(lock);
642                                         RETURN(-EPROTO);
643                                 }
644
645                                 OBD_ALLOC_LARGE(lmm, lvb_len);
646                                 if (lmm == NULL) {
647                                         LDLM_LOCK_PUT(lock);
648                                         RETURN(-ENOMEM);
649                                 }
650                                 memcpy(lmm, lvb, lvb_len);
651
652                                 /* install lvb_data */
653                                 lock_res_and_lock(lock);
654                                 LASSERT(lock->l_lvb_data == NULL);
655                                 lock->l_lvb_data = lmm;
656                                 lock->l_lvb_len = lvb_len;
657                                 unlock_res_and_lock(lock);
658                         }
659                 }
660                 if (lock != NULL)
661                         LDLM_LOCK_PUT(lock);
662         }
663
664         RETURN(rc);
665 }
666
667 /* We always reserve enough space in the reply packet for a stripe MD, because
668  * we don't know in advance the file type. */
669 int mdc_enqueue(struct obd_export *exp, struct ldlm_enqueue_info *einfo,
670                 struct lookup_intent *it, struct md_op_data *op_data,
671                 struct lustre_handle *lockh, void *lmm, int lmmsize,
672                 struct ptlrpc_request **reqp, __u64 extra_lock_flags)
673 {
674         struct obd_device     *obddev = class_exp2obd(exp);
675         struct ptlrpc_request *req = NULL;
676         __u64                  flags, saved_flags = extra_lock_flags;
677         int                    rc;
678         struct ldlm_res_id res_id;
679         static const ldlm_policy_data_t lookup_policy =
680                             { .l_inodebits = { MDS_INODELOCK_LOOKUP } };
681         static const ldlm_policy_data_t update_policy =
682                             { .l_inodebits = { MDS_INODELOCK_UPDATE } };
683         static const ldlm_policy_data_t layout_policy =
684                             { .l_inodebits = { MDS_INODELOCK_LAYOUT } };
685         ldlm_policy_data_t const *policy = &lookup_policy;
686         int                    generation, resends = 0;
687         struct ldlm_reply     *lockrep;
688         ENTRY;
689
690         LASSERTF(!it || einfo->ei_type == LDLM_IBITS, "lock type %d\n",
691                  einfo->ei_type);
692
693         fid_build_reg_res_name(&op_data->op_fid1, &res_id);
694
695         if (it) {
696                 saved_flags |= LDLM_FL_HAS_INTENT;
697                 if (it->it_op & (IT_UNLINK | IT_GETATTR | IT_READDIR))
698                         policy = &update_policy;
699                 else if (it->it_op & IT_LAYOUT)
700                         policy = &layout_policy;
701         }
702
703         LASSERT(reqp == NULL);
704
705         generation = obddev->u.cli.cl_import->imp_generation;
706 resend:
707         flags = saved_flags;
708         if (!it) {
709                 /* The only way right now is FLOCK, in this case we hide flock
710                    policy as lmm, but lmmsize is 0 */
711                 LASSERT(lmm && lmmsize == 0);
712                 LASSERTF(einfo->ei_type == LDLM_FLOCK, "lock type %d\n",
713                          einfo->ei_type);
714                 policy = (ldlm_policy_data_t *)lmm;
715                 res_id.name[3] = LDLM_FLOCK;
716         } else if (it->it_op & IT_OPEN) {
717                 req = mdc_intent_open_pack(exp, it, op_data, lmm, lmmsize,
718                                            einfo->ei_cbdata);
719                 policy = &update_policy;
720                 einfo->ei_cbdata = NULL;
721                 lmm = NULL;
722         } else if (it->it_op & IT_UNLINK)
723                 req = mdc_intent_unlink_pack(exp, it, op_data);
724         else if (it->it_op & (IT_GETATTR | IT_LOOKUP))
725                 req = mdc_intent_getattr_pack(exp, it, op_data);
726         else if (it->it_op & (IT_READDIR | IT_LAYOUT))
727                 req = ldlm_enqueue_pack(exp);
728         else {
729                 LBUG();
730                 RETURN(-EINVAL);
731         }
732
733         if (IS_ERR(req))
734                 RETURN(PTR_ERR(req));
735
736         if (req != NULL && it && it->it_op & IT_CREAT)
737                 /* ask ptlrpc not to resend on EINPROGRESS since we have our own
738                  * retry logic */
739                 req->rq_no_retry_einprogress = 1;
740
741         if (resends) {
742                 req->rq_generation_set = 1;
743                 req->rq_import_generation = generation;
744                 req->rq_sent = cfs_time_current_sec() + resends;
745         }
746
747         /* It is important to obtain rpc_lock first (if applicable), so that
748          * threads that are serialised with rpc_lock are not polluting our
749          * rpcs in flight counter. We do not do flock request limiting, though*/
750         if (it) {
751                 mdc_get_rpc_lock(obddev->u.cli.cl_rpc_lock, it);
752                 rc = mdc_enter_request(&obddev->u.cli);
753                 if (rc != 0) {
754                         mdc_put_rpc_lock(obddev->u.cli.cl_rpc_lock, it);
755                         mdc_clear_replay_flag(req, 0);
756                         ptlrpc_req_finished(req);
757                         RETURN(rc);
758                 }
759         }
760
761         rc = ldlm_cli_enqueue(exp, &req, einfo, &res_id, policy, &flags, NULL,
762                               0, lockh, 0);
763         if (!it) {
764                 /* For flock requests we immediatelly return without further
765                    delay and let caller deal with the rest, since rest of
766                    this function metadata processing makes no sense for flock
767                    requests anyway */
768                 RETURN(rc);
769         }
770
771         mdc_exit_request(&obddev->u.cli);
772         mdc_put_rpc_lock(obddev->u.cli.cl_rpc_lock, it);
773
774         if (rc < 0) {
775                 CERROR("ldlm_cli_enqueue: %d\n", rc);
776                 mdc_clear_replay_flag(req, rc);
777                 ptlrpc_req_finished(req);
778                 RETURN(rc);
779         }
780
781         lockrep = req_capsule_server_get(&req->rq_pill, &RMF_DLM_REP);
782         LASSERT(lockrep != NULL);
783
784         /* Retry the create infinitely when we get -EINPROGRESS from
785          * server. This is required by the new quota design. */
786         if (it && it->it_op & IT_CREAT &&
787             (int)lockrep->lock_policy_res2 == -EINPROGRESS) {
788                 mdc_clear_replay_flag(req, rc);
789                 ptlrpc_req_finished(req);
790                 resends++;
791
792                 CDEBUG(D_HA, "%s: resend:%d op:%d "DFID"/"DFID"\n",
793                        obddev->obd_name, resends, it->it_op,
794                        PFID(&op_data->op_fid1), PFID(&op_data->op_fid2));
795
796                 if (generation == obddev->u.cli.cl_import->imp_generation) {
797                         goto resend;
798                 } else {
799                         CDEBUG(D_HA, "resend cross eviction\n");
800                         RETURN(-EIO);
801                 }
802         }
803
804         rc = mdc_finish_enqueue(exp, req, einfo, it, lockh, rc);
805
806         RETURN(rc);
807 }
808
809 static int mdc_finish_intent_lock(struct obd_export *exp,
810                                   struct ptlrpc_request *request,
811                                   struct md_op_data *op_data,
812                                   struct lookup_intent *it,
813                                   struct lustre_handle *lockh)
814 {
815         struct lustre_handle old_lock;
816         struct mdt_body *mdt_body;
817         struct ldlm_lock *lock;
818         int rc;
819
820
821         LASSERT(request != NULL);
822         LASSERT(request != LP_POISON);
823         LASSERT(request->rq_repmsg != LP_POISON);
824
825         if (!it_disposition(it, DISP_IT_EXECD)) {
826                 /* The server failed before it even started executing the
827                  * intent, i.e. because it couldn't unpack the request. */
828                 LASSERT(it->d.lustre.it_status != 0);
829                 RETURN(it->d.lustre.it_status);
830         }
831         rc = it_open_error(DISP_IT_EXECD, it);
832         if (rc)
833                 RETURN(rc);
834
835         mdt_body = req_capsule_server_get(&request->rq_pill, &RMF_MDT_BODY);
836         LASSERT(mdt_body != NULL);      /* mdc_enqueue checked */
837
838         /* If we were revalidating a fid/name pair, mark the intent in
839          * case we fail and get called again from lookup */
840         if (fid_is_sane(&op_data->op_fid2) &&
841             it->it_create_mode & M_CHECK_STALE &&
842             it->it_op != IT_GETATTR) {
843                 it_set_disposition(it, DISP_ENQ_COMPLETE);
844
845                 /* Also: did we find the same inode? */
846                 /* sever can return one of two fids:
847                  * op_fid2 - new allocated fid - if file is created.
848                  * op_fid3 - existent fid - if file only open.
849                  * op_fid3 is saved in lmv_intent_open */
850                 if ((!lu_fid_eq(&op_data->op_fid2, &mdt_body->fid1)) &&
851                     (!lu_fid_eq(&op_data->op_fid3, &mdt_body->fid1))) {
852                         CDEBUG(D_DENTRY, "Found stale data "DFID"("DFID")/"DFID
853                                "\n", PFID(&op_data->op_fid2),
854                                PFID(&op_data->op_fid2), PFID(&mdt_body->fid1));
855                         RETURN(-ESTALE);
856                 }
857         }
858
859         rc = it_open_error(DISP_LOOKUP_EXECD, it);
860         if (rc)
861                 RETURN(rc);
862
863         /* keep requests around for the multiple phases of the call
864          * this shows the DISP_XX must guarantee we make it into the call
865          */
866         if (!it_disposition(it, DISP_ENQ_CREATE_REF) &&
867             it_disposition(it, DISP_OPEN_CREATE) &&
868             !it_open_error(DISP_OPEN_CREATE, it)) {
869                 it_set_disposition(it, DISP_ENQ_CREATE_REF);
870                 ptlrpc_request_addref(request); /* balanced in ll_create_node */
871         }
872         if (!it_disposition(it, DISP_ENQ_OPEN_REF) &&
873             it_disposition(it, DISP_OPEN_OPEN) &&
874             !it_open_error(DISP_OPEN_OPEN, it)) {
875                 it_set_disposition(it, DISP_ENQ_OPEN_REF);
876                 ptlrpc_request_addref(request); /* balanced in ll_file_open */
877                 /* BUG 11546 - eviction in the middle of open rpc processing */
878                 OBD_FAIL_TIMEOUT(OBD_FAIL_MDC_ENQUEUE_PAUSE, obd_timeout);
879         }
880
881         if (it->it_op & IT_CREAT) {
882                 /* XXX this belongs in ll_create_it */
883         } else if (it->it_op == IT_OPEN) {
884                 LASSERT(!it_disposition(it, DISP_OPEN_CREATE));
885         } else {
886                 LASSERT(it->it_op & (IT_GETATTR | IT_LOOKUP | IT_LAYOUT));
887         }
888
889         /* If we already have a matching lock, then cancel the new
890          * one.  We have to set the data here instead of in
891          * mdc_enqueue, because we need to use the child's inode as
892          * the l_ast_data to match, and that's not available until
893          * intent_finish has performed the iget().) */
894         lock = ldlm_handle2lock(lockh);
895         if (lock) {
896                 ldlm_policy_data_t policy = lock->l_policy_data;
897                 LDLM_DEBUG(lock, "matching against this");
898
899                 LASSERTF(fid_res_name_eq(&mdt_body->fid1,
900                                          &lock->l_resource->lr_name),
901                          "Lock res_id: %lu/%lu/%lu, fid: %lu/%lu/%lu.\n",
902                          (unsigned long)lock->l_resource->lr_name.name[0],
903                          (unsigned long)lock->l_resource->lr_name.name[1],
904                          (unsigned long)lock->l_resource->lr_name.name[2],
905                          (unsigned long)fid_seq(&mdt_body->fid1),
906                          (unsigned long)fid_oid(&mdt_body->fid1),
907                          (unsigned long)fid_ver(&mdt_body->fid1));
908                 LDLM_LOCK_PUT(lock);
909
910                 memcpy(&old_lock, lockh, sizeof(*lockh));
911                 if (ldlm_lock_match(NULL, LDLM_FL_BLOCK_GRANTED, NULL,
912                                     LDLM_IBITS, &policy, LCK_NL, &old_lock, 0)) {
913                         ldlm_lock_decref_and_cancel(lockh,
914                                                     it->d.lustre.it_lock_mode);
915                         memcpy(lockh, &old_lock, sizeof(old_lock));
916                         it->d.lustre.it_lock_handle = lockh->cookie;
917                 }
918         }
919         CDEBUG(D_DENTRY,"D_IT dentry %.*s intent: %s status %d disp %x rc %d\n",
920                op_data->op_namelen, op_data->op_name, ldlm_it2str(it->it_op),
921                it->d.lustre.it_status, it->d.lustre.it_disposition, rc);
922         RETURN(rc);
923 }
924
925 int mdc_revalidate_lock(struct obd_export *exp, struct lookup_intent *it,
926                         struct lu_fid *fid, __u64 *bits)
927 {
928         /* We could just return 1 immediately, but since we should only
929          * be called in revalidate_it if we already have a lock, let's
930          * verify that. */
931         struct ldlm_res_id res_id;
932         struct lustre_handle lockh;
933         ldlm_policy_data_t policy;
934         ldlm_mode_t mode;
935         ENTRY;
936
937         if (it->d.lustre.it_lock_handle) {
938                 lockh.cookie = it->d.lustre.it_lock_handle;
939                 mode = ldlm_revalidate_lock_handle(&lockh, bits);
940         } else {
941                 fid_build_reg_res_name(fid, &res_id);
942                 switch (it->it_op) {
943                 case IT_GETATTR:
944                         policy.l_inodebits.bits = MDS_INODELOCK_UPDATE;
945                         break;
946                 case IT_LAYOUT:
947                         policy.l_inodebits.bits = MDS_INODELOCK_LAYOUT;
948                         break;
949                 default:
950                         policy.l_inodebits.bits = MDS_INODELOCK_LOOKUP;
951                         break;
952                 }
953                 mode = ldlm_lock_match(exp->exp_obd->obd_namespace,
954                                        LDLM_FL_BLOCK_GRANTED, &res_id,
955                                        LDLM_IBITS, &policy,
956                                        LCK_CR|LCK_CW|LCK_PR|LCK_PW, &lockh, 0);
957         }
958
959         if (mode) {
960                 it->d.lustre.it_lock_handle = lockh.cookie;
961                 it->d.lustre.it_lock_mode = mode;
962         } else {
963                 it->d.lustre.it_lock_handle = 0;
964                 it->d.lustre.it_lock_mode = 0;
965         }
966
967         RETURN(!!mode);
968 }
969
970 /*
971  * This long block is all about fixing up the lock and request state
972  * so that it is correct as of the moment _before_ the operation was
973  * applied; that way, the VFS will think that everything is normal and
974  * call Lustre's regular VFS methods.
975  *
976  * If we're performing a creation, that means that unless the creation
977  * failed with EEXIST, we should fake up a negative dentry.
978  *
979  * For everything else, we want to lookup to succeed.
980  *
981  * One additional note: if CREATE or OPEN succeeded, we add an extra
982  * reference to the request because we need to keep it around until
983  * ll_create/ll_open gets called.
984  *
985  * The server will return to us, in it_disposition, an indication of
986  * exactly what d.lustre.it_status refers to.
987  *
988  * If DISP_OPEN_OPEN is set, then d.lustre.it_status refers to the open() call,
989  * otherwise if DISP_OPEN_CREATE is set, then it status is the
990  * creation failure mode.  In either case, one of DISP_LOOKUP_NEG or
991  * DISP_LOOKUP_POS will be set, indicating whether the child lookup
992  * was successful.
993  *
994  * Else, if DISP_LOOKUP_EXECD then d.lustre.it_status is the rc of the
995  * child lookup.
996  */
997 int mdc_intent_lock(struct obd_export *exp, struct md_op_data *op_data,
998                     void *lmm, int lmmsize, struct lookup_intent *it,
999                     int lookup_flags, struct ptlrpc_request **reqp,
1000                     ldlm_blocking_callback cb_blocking,
1001                     __u64 extra_lock_flags)
1002 {
1003         struct lustre_handle lockh;
1004         int rc = 0;
1005         ENTRY;
1006         LASSERT(it);
1007
1008         CDEBUG(D_DLMTRACE, "(name: %.*s,"DFID") in obj "DFID
1009                ", intent: %s flags %#o\n", op_data->op_namelen,
1010                op_data->op_name, PFID(&op_data->op_fid2),
1011                PFID(&op_data->op_fid1), ldlm_it2str(it->it_op),
1012                it->it_flags);
1013
1014         lockh.cookie = 0;
1015         if (fid_is_sane(&op_data->op_fid2) &&
1016             (it->it_op & (IT_LOOKUP | IT_GETATTR | IT_LAYOUT))) {
1017                 /* We could just return 1 immediately, but since we should only
1018                  * be called in revalidate_it if we already have a lock, let's
1019                  * verify that. */
1020                 it->d.lustre.it_lock_handle = 0;
1021                 rc = mdc_revalidate_lock(exp, it, &op_data->op_fid2, NULL);
1022                 /* Only return failure if it was not GETATTR by cfid
1023                    (from inode_revalidate) */
1024                 if (rc || op_data->op_namelen != 0)
1025                         RETURN(rc);
1026         }
1027
1028         /* lookup_it may be called only after revalidate_it has run, because
1029          * revalidate_it cannot return errors, only zero.  Returning zero causes
1030          * this call to lookup, which *can* return an error.
1031          *
1032          * We only want to execute the request associated with the intent one
1033          * time, however, so don't send the request again.  Instead, skip past
1034          * this and use the request from revalidate.  In this case, revalidate
1035          * never dropped its reference, so the refcounts are all OK */
1036         if (!it_disposition(it, DISP_ENQ_COMPLETE)) {
1037                 struct ldlm_enqueue_info einfo =
1038                         { LDLM_IBITS, it_to_lock_mode(it), cb_blocking,
1039                           ldlm_completion_ast, NULL, NULL, NULL };
1040
1041                 /* For case if upper layer did not alloc fid, do it now. */
1042                 if (!fid_is_sane(&op_data->op_fid2) && it->it_op & IT_CREAT) {
1043                         rc = mdc_fid_alloc(exp, &op_data->op_fid2, op_data);
1044                         if (rc < 0) {
1045                                 CERROR("Can't alloc new fid, rc %d\n", rc);
1046                                 RETURN(rc);
1047                         }
1048                 }
1049                 rc = mdc_enqueue(exp, &einfo, it, op_data, &lockh,
1050                                  lmm, lmmsize, NULL, extra_lock_flags);
1051                 if (rc < 0)
1052                         RETURN(rc);
1053         } else if (!fid_is_sane(&op_data->op_fid2) ||
1054                    !(it->it_create_mode & M_CHECK_STALE)) {
1055                 /* DISP_ENQ_COMPLETE set means there is extra reference on
1056                  * request referenced from this intent, saved for subsequent
1057                  * lookup.  This path is executed when we proceed to this
1058                  * lookup, so we clear DISP_ENQ_COMPLETE */
1059                 it_clear_disposition(it, DISP_ENQ_COMPLETE);
1060         }
1061         *reqp = it->d.lustre.it_data;
1062         rc = mdc_finish_intent_lock(exp, *reqp, op_data, it, &lockh);
1063         RETURN(rc);
1064 }
1065
1066 static int mdc_intent_getattr_async_interpret(const struct lu_env *env,
1067                                               struct ptlrpc_request *req,
1068                                               void *args, int rc)
1069 {
1070         struct mdc_getattr_args  *ga = args;
1071         struct obd_export        *exp = ga->ga_exp;
1072         struct md_enqueue_info   *minfo = ga->ga_minfo;
1073         struct ldlm_enqueue_info *einfo = ga->ga_einfo;
1074         struct lookup_intent     *it;
1075         struct lustre_handle     *lockh;
1076         struct obd_device        *obddev;
1077         __u64                     flags = LDLM_FL_HAS_INTENT;
1078         ENTRY;
1079
1080         it    = &minfo->mi_it;
1081         lockh = &minfo->mi_lockh;
1082
1083         obddev = class_exp2obd(exp);
1084
1085         mdc_exit_request(&obddev->u.cli);
1086         if (OBD_FAIL_CHECK(OBD_FAIL_MDC_GETATTR_ENQUEUE))
1087                 rc = -ETIMEDOUT;
1088
1089         rc = ldlm_cli_enqueue_fini(exp, req, einfo->ei_type, 1, einfo->ei_mode,
1090                                    &flags, NULL, 0, lockh, rc);
1091         if (rc < 0) {
1092                 CERROR("ldlm_cli_enqueue_fini: %d\n", rc);
1093                 mdc_clear_replay_flag(req, rc);
1094                 GOTO(out, rc);
1095         }
1096
1097         rc = mdc_finish_enqueue(exp, req, einfo, it, lockh, rc);
1098         if (rc)
1099                 GOTO(out, rc);
1100
1101         rc = mdc_finish_intent_lock(exp, req, &minfo->mi_data, it, lockh);
1102         EXIT;
1103
1104 out:
1105         OBD_FREE_PTR(einfo);
1106         minfo->mi_cb(req, minfo, rc);
1107         return 0;
1108 }
1109
1110 int mdc_intent_getattr_async(struct obd_export *exp,
1111                              struct md_enqueue_info *minfo,
1112                              struct ldlm_enqueue_info *einfo)
1113 {
1114         struct md_op_data       *op_data = &minfo->mi_data;
1115         struct lookup_intent    *it = &minfo->mi_it;
1116         struct ptlrpc_request   *req;
1117         struct mdc_getattr_args *ga;
1118         struct obd_device       *obddev = class_exp2obd(exp);
1119         struct ldlm_res_id       res_id;
1120         /*XXX: Both MDS_INODELOCK_LOOKUP and MDS_INODELOCK_UPDATE are needed
1121          *     for statahead currently. Consider CMD in future, such two bits
1122          *     maybe managed by different MDS, should be adjusted then. */
1123         ldlm_policy_data_t       policy = {
1124                                         .l_inodebits = { MDS_INODELOCK_LOOKUP | 
1125                                                          MDS_INODELOCK_UPDATE }
1126                                  };
1127         int                      rc = 0;
1128         __u64                    flags = LDLM_FL_HAS_INTENT;
1129         ENTRY;
1130
1131         CDEBUG(D_DLMTRACE,"name: %.*s in inode "DFID", intent: %s flags %#o\n",
1132                op_data->op_namelen, op_data->op_name, PFID(&op_data->op_fid1),
1133                ldlm_it2str(it->it_op), it->it_flags);
1134
1135         fid_build_reg_res_name(&op_data->op_fid1, &res_id);
1136         req = mdc_intent_getattr_pack(exp, it, op_data);
1137         if (!req)
1138                 RETURN(-ENOMEM);
1139
1140         rc = mdc_enter_request(&obddev->u.cli);
1141         if (rc != 0) {
1142                 ptlrpc_req_finished(req);
1143                 RETURN(rc);
1144         }
1145
1146         rc = ldlm_cli_enqueue(exp, &req, einfo, &res_id, &policy, &flags, NULL,
1147                               0, &minfo->mi_lockh, 1);
1148         if (rc < 0) {
1149                 mdc_exit_request(&obddev->u.cli);
1150                 ptlrpc_req_finished(req);
1151                 RETURN(rc);
1152         }
1153
1154         CLASSERT(sizeof(*ga) <= sizeof(req->rq_async_args));
1155         ga = ptlrpc_req_async_args(req);
1156         ga->ga_exp = exp;
1157         ga->ga_minfo = minfo;
1158         ga->ga_einfo = einfo;
1159
1160         req->rq_interpret_reply = mdc_intent_getattr_async_interpret;
1161         ptlrpcd_add_req(req, PDL_POLICY_LOCAL, -1);
1162
1163         RETURN(0);
1164 }