Whamcloud - gitweb
1d5f6cc1ac9b23a4f63b5c0c3c5c94bf4132e9b5
[fs/lustre-release.git] / lustre / mdc / mdc_locks.c
1 /*
2  * GPL HEADER START
3  *
4  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
5  *
6  * This program is free software; you can redistribute it and/or modify
7  * it under the terms of the GNU General Public License version 2 only,
8  * as published by the Free Software Foundation.
9  *
10  * This program is distributed in the hope that it will be useful, but
11  * WITHOUT ANY WARRANTY; without even the implied warranty of
12  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13  * General Public License version 2 for more details (a copy is included
14  * in the LICENSE file that accompanied this code).
15  *
16  * You should have received a copy of the GNU General Public License
17  * version 2 along with this program; If not, see
18  * http://www.sun.com/software/products/lustre/docs/GPLv2.pdf
19  *
20  * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara,
21  * CA 95054 USA or visit www.sun.com if you need additional information or
22  * have any questions.
23  *
24  * GPL HEADER END
25  */
26 /*
27  * Copyright (c) 2003, 2010, Oracle and/or its affiliates. All rights reserved.
28  * Use is subject to license terms.
29  *
30  * Copyright (c) 2011, 2012, Whamcloud, Inc.
31  */
32 /*
33  * This file is part of Lustre, http://www.lustre.org/
34  * Lustre is a trademark of Sun Microsystems, Inc.
35  */
36
37 #ifndef EXPORT_SYMTAB
38 # define EXPORT_SYMTAB
39 #endif
40 #define DEBUG_SUBSYSTEM S_MDC
41
42 #ifdef __KERNEL__
43 # include <linux/module.h>
44 # include <linux/pagemap.h>
45 # include <linux/miscdevice.h>
46 # include <linux/init.h>
47 #else
48 # include <liblustre.h>
49 #endif
50
51 #include <lustre_acl.h>
52 #include <obd_class.h>
53 #include <lustre_dlm.h>
54 /* fid_res_name_eq() */
55 #include <lustre_fid.h>
56 #include <lprocfs_status.h>
57 #include "mdc_internal.h"
58
59 struct mdc_getattr_args {
60         struct obd_export           *ga_exp;
61         struct md_enqueue_info      *ga_minfo;
62         struct ldlm_enqueue_info    *ga_einfo;
63 };
64
65 int it_disposition(struct lookup_intent *it, int flag)
66 {
67         return it->d.lustre.it_disposition & flag;
68 }
69 EXPORT_SYMBOL(it_disposition);
70
71 void it_set_disposition(struct lookup_intent *it, int flag)
72 {
73         it->d.lustre.it_disposition |= flag;
74 }
75 EXPORT_SYMBOL(it_set_disposition);
76
77 void it_clear_disposition(struct lookup_intent *it, int flag)
78 {
79         it->d.lustre.it_disposition &= ~flag;
80 }
81 EXPORT_SYMBOL(it_clear_disposition);
82
83 int it_open_error(int phase, struct lookup_intent *it)
84 {
85         if (it_disposition(it, DISP_OPEN_OPEN)) {
86                 if (phase >= DISP_OPEN_OPEN)
87                         return it->d.lustre.it_status;
88                 else
89                         return 0;
90         }
91
92         if (it_disposition(it, DISP_OPEN_CREATE)) {
93                 if (phase >= DISP_OPEN_CREATE)
94                         return it->d.lustre.it_status;
95                 else
96                         return 0;
97         }
98
99         if (it_disposition(it, DISP_LOOKUP_EXECD)) {
100                 if (phase >= DISP_LOOKUP_EXECD)
101                         return it->d.lustre.it_status;
102                 else
103                         return 0;
104         }
105
106         if (it_disposition(it, DISP_IT_EXECD)) {
107                 if (phase >= DISP_IT_EXECD)
108                         return it->d.lustre.it_status;
109                 else
110                         return 0;
111         }
112         CERROR("it disp: %X, status: %d\n", it->d.lustre.it_disposition,
113                it->d.lustre.it_status);
114         LBUG();
115         return 0;
116 }
117 EXPORT_SYMBOL(it_open_error);
118
119 /* this must be called on a lockh that is known to have a referenced lock */
120 int mdc_set_lock_data(struct obd_export *exp, __u64 *lockh, void *data,
121                       __u64 *bits)
122 {
123         struct ldlm_lock *lock;
124         ENTRY;
125
126         if(bits)
127                 *bits = 0;
128
129         if (!*lockh)
130                 RETURN(0);
131
132         lock = ldlm_handle2lock((struct lustre_handle *)lockh);
133
134         LASSERT(lock != NULL);
135         lock_res_and_lock(lock);
136 #ifdef __KERNEL__
137         if (lock->l_ast_data && lock->l_ast_data != data) {
138                 struct inode *new_inode = data;
139                 struct inode *old_inode = lock->l_ast_data;
140                 LASSERTF(old_inode->i_state & I_FREEING,
141                          "Found existing inode %p/%lu/%u state %lu in lock: "
142                          "setting data to %p/%lu/%u\n", old_inode,
143                          old_inode->i_ino, old_inode->i_generation,
144                          old_inode->i_state,
145                          new_inode, new_inode->i_ino, new_inode->i_generation);
146         }
147 #endif
148         lock->l_ast_data = data;
149         if (bits)
150                 *bits = lock->l_policy_data.l_inodebits.bits;
151
152         unlock_res_and_lock(lock);
153         LDLM_LOCK_PUT(lock);
154
155         RETURN(0);
156 }
157
158 ldlm_mode_t mdc_lock_match(struct obd_export *exp, int flags,
159                            const struct lu_fid *fid, ldlm_type_t type,
160                            ldlm_policy_data_t *policy, ldlm_mode_t mode,
161                            struct lustre_handle *lockh)
162 {
163         struct ldlm_res_id res_id;
164         ldlm_mode_t rc;
165         ENTRY;
166
167         fid_build_reg_res_name(fid, &res_id);
168         rc = ldlm_lock_match(class_exp2obd(exp)->obd_namespace, flags,
169                              &res_id, type, policy, mode, lockh, 0);
170         RETURN(rc);
171 }
172
173 int mdc_cancel_unused(struct obd_export *exp,
174                       const struct lu_fid *fid,
175                       ldlm_policy_data_t *policy,
176                       ldlm_mode_t mode,
177                       ldlm_cancel_flags_t flags,
178                       void *opaque)
179 {
180         struct ldlm_res_id res_id;
181         struct obd_device *obd = class_exp2obd(exp);
182         int rc;
183
184         ENTRY;
185
186         fid_build_reg_res_name(fid, &res_id);
187         rc = ldlm_cli_cancel_unused_resource(obd->obd_namespace, &res_id,
188                                              policy, mode, flags, opaque);
189         RETURN(rc);
190 }
191
192 int mdc_change_cbdata(struct obd_export *exp,
193                       const struct lu_fid *fid,
194                       ldlm_iterator_t it, void *data)
195 {
196         struct ldlm_res_id res_id;
197         ENTRY;
198
199         fid_build_reg_res_name(fid, &res_id);
200         ldlm_resource_iterate(class_exp2obd(exp)->obd_namespace,
201                               &res_id, it, data);
202
203         EXIT;
204         return 0;
205 }
206
207 /* find any ldlm lock of the inode in mdc
208  * return 0    not find
209  *        1    find one
210  *      < 0    error */
211 int mdc_find_cbdata(struct obd_export *exp,
212                     const struct lu_fid *fid,
213                     ldlm_iterator_t it, void *data)
214 {
215         struct ldlm_res_id res_id;
216         int rc = 0;
217         ENTRY;
218
219         fid_build_reg_res_name((struct lu_fid*)fid, &res_id);
220         rc = ldlm_resource_iterate(class_exp2obd(exp)->obd_namespace, &res_id,
221                                    it, data);
222         if (rc == LDLM_ITER_STOP)
223                 RETURN(1);
224         else if (rc == LDLM_ITER_CONTINUE)
225                 RETURN(0);
226         RETURN(rc);
227 }
228
229 static inline void mdc_clear_replay_flag(struct ptlrpc_request *req, int rc)
230 {
231         /* Don't hold error requests for replay. */
232         if (req->rq_replay) {
233                 cfs_spin_lock(&req->rq_lock);
234                 req->rq_replay = 0;
235                 cfs_spin_unlock(&req->rq_lock);
236         }
237         if (rc && req->rq_transno != 0) {
238                 DEBUG_REQ(D_ERROR, req, "transno returned on error rc %d", rc);
239                 LBUG();
240         }
241 }
242
243 /* Save a large LOV EA into the request buffer so that it is available
244  * for replay.  We don't do this in the initial request because the
245  * original request doesn't need this buffer (at most it sends just the
246  * lov_mds_md) and it is a waste of RAM/bandwidth to send the empty
247  * buffer and may also be difficult to allocate and save a very large
248  * request buffer for each open. (bug 5707)
249  *
250  * OOM here may cause recovery failure if lmm is needed (only for the
251  * original open if the MDS crashed just when this client also OOM'd)
252  * but this is incredibly unlikely, and questionable whether the client
253  * could do MDS recovery under OOM anyways... */
254 static void mdc_realloc_openmsg(struct ptlrpc_request *req,
255                                 struct mdt_body *body)
256 {
257         int     rc;
258
259         /* FIXME: remove this explicit offset. */
260         rc = sptlrpc_cli_enlarge_reqbuf(req, DLM_INTENT_REC_OFF + 4,
261                                         body->eadatasize);
262         if (rc) {
263                 CERROR("Can't enlarge segment %d size to %d\n",
264                        DLM_INTENT_REC_OFF + 4, body->eadatasize);
265                 body->valid &= ~OBD_MD_FLEASIZE;
266                 body->eadatasize = 0;
267         }
268 }
269
270 static struct ptlrpc_request *mdc_intent_open_pack(struct obd_export *exp,
271                                                    struct lookup_intent *it,
272                                                    struct md_op_data *op_data,
273                                                    void *lmm, int lmmsize,
274                                                    void *cb_data)
275 {
276         struct ptlrpc_request *req;
277         struct obd_device     *obddev = class_exp2obd(exp);
278         struct ldlm_intent    *lit;
279         CFS_LIST_HEAD(cancels);
280         int                    count = 0;
281         int                    mode;
282         int                    rc;
283         ENTRY;
284
285         it->it_create_mode = (it->it_create_mode & ~S_IFMT) | S_IFREG;
286
287         /* XXX: openlock is not cancelled for cross-refs. */
288         /* If inode is known, cancel conflicting OPEN locks. */
289         if (fid_is_sane(&op_data->op_fid2)) {
290                 if (it->it_flags & (FMODE_WRITE|MDS_OPEN_TRUNC))
291                         mode = LCK_CW;
292 #ifdef FMODE_EXEC
293                 else if (it->it_flags & FMODE_EXEC)
294                         mode = LCK_PR;
295 #endif
296                 else
297                         mode = LCK_CR;
298                 count = mdc_resource_get_unused(exp, &op_data->op_fid2,
299                                                 &cancels, mode,
300                                                 MDS_INODELOCK_OPEN);
301         }
302
303         /* If CREATE, cancel parent's UPDATE lock. */
304         if (it->it_op & IT_CREAT)
305                 mode = LCK_EX;
306         else
307                 mode = LCK_CR;
308         count += mdc_resource_get_unused(exp, &op_data->op_fid1,
309                                          &cancels, mode,
310                                          MDS_INODELOCK_UPDATE);
311
312         req = ptlrpc_request_alloc(class_exp2cliimp(exp),
313                                    &RQF_LDLM_INTENT_OPEN);
314         if (req == NULL) {
315                 ldlm_lock_list_put(&cancels, l_bl_ast, count);
316                 RETURN(ERR_PTR(-ENOMEM));
317         }
318
319         /* parent capability */
320         mdc_set_capa_size(req, &RMF_CAPA1, op_data->op_capa1);
321         /* child capability, reserve the size according to parent capa, it will
322          * be filled after we get the reply */
323         mdc_set_capa_size(req, &RMF_CAPA2, op_data->op_capa1);
324
325         req_capsule_set_size(&req->rq_pill, &RMF_NAME, RCL_CLIENT,
326                              op_data->op_namelen + 1);
327         req_capsule_set_size(&req->rq_pill, &RMF_EADATA, RCL_CLIENT,
328                              max(lmmsize, obddev->u.cli.cl_default_mds_easize));
329
330         rc = ldlm_prep_enqueue_req(exp, req, &cancels, count);
331         if (rc) {
332                 ptlrpc_request_free(req);
333                 return NULL;
334         }
335
336         cfs_spin_lock(&req->rq_lock);
337         req->rq_replay = req->rq_import->imp_replayable;
338         cfs_spin_unlock(&req->rq_lock);
339
340         /* pack the intent */
341         lit = req_capsule_client_get(&req->rq_pill, &RMF_LDLM_INTENT);
342         lit->opc = (__u64)it->it_op;
343
344         /* pack the intended request */
345         mdc_open_pack(req, op_data, it->it_create_mode, 0, it->it_flags, lmm,
346                       lmmsize);
347
348         /* for remote client, fetch remote perm for current user */
349         if (client_is_remote(exp))
350                 req_capsule_set_size(&req->rq_pill, &RMF_ACL, RCL_SERVER,
351                                      sizeof(struct mdt_remote_perm));
352         ptlrpc_request_set_replen(req);
353         return req;
354 }
355
356 static struct ptlrpc_request *mdc_intent_unlink_pack(struct obd_export *exp,
357                                                      struct lookup_intent *it,
358                                                      struct md_op_data *op_data)
359 {
360         struct ptlrpc_request *req;
361         struct obd_device     *obddev = class_exp2obd(exp);
362         struct ldlm_intent    *lit;
363         int                    rc;
364         ENTRY;
365
366         req = ptlrpc_request_alloc(class_exp2cliimp(exp),
367                                    &RQF_LDLM_INTENT_UNLINK);
368         if (req == NULL)
369                 RETURN(ERR_PTR(-ENOMEM));
370
371         mdc_set_capa_size(req, &RMF_CAPA1, op_data->op_capa1);
372         req_capsule_set_size(&req->rq_pill, &RMF_NAME, RCL_CLIENT,
373                              op_data->op_namelen + 1);
374
375         rc = ldlm_prep_enqueue_req(exp, req, NULL, 0);
376         if (rc) {
377                 ptlrpc_request_free(req);
378                 RETURN(ERR_PTR(rc));
379         }
380
381         /* pack the intent */
382         lit = req_capsule_client_get(&req->rq_pill, &RMF_LDLM_INTENT);
383         lit->opc = (__u64)it->it_op;
384
385         /* pack the intended request */
386         mdc_unlink_pack(req, op_data);
387
388         req_capsule_set_size(&req->rq_pill, &RMF_MDT_MD, RCL_SERVER,
389                              obddev->u.cli.cl_max_mds_easize);
390         req_capsule_set_size(&req->rq_pill, &RMF_ACL, RCL_SERVER,
391                              obddev->u.cli.cl_max_mds_cookiesize);
392         ptlrpc_request_set_replen(req);
393         RETURN(req);
394 }
395
396 static struct ptlrpc_request *mdc_intent_getattr_pack(struct obd_export *exp,
397                                                       struct lookup_intent *it,
398                                                       struct md_op_data *op_data)
399 {
400         struct ptlrpc_request *req;
401         struct obd_device     *obddev = class_exp2obd(exp);
402         obd_valid              valid = OBD_MD_FLGETATTR | OBD_MD_FLEASIZE |
403                                        OBD_MD_FLMODEASIZE | OBD_MD_FLDIREA |
404                                        OBD_MD_FLMDSCAPA | OBD_MD_MEA |
405                                        (client_is_remote(exp) ?
406                                                OBD_MD_FLRMTPERM : OBD_MD_FLACL);
407         struct ldlm_intent    *lit;
408         int                    rc;
409         ENTRY;
410
411         req = ptlrpc_request_alloc(class_exp2cliimp(exp),
412                                    &RQF_LDLM_INTENT_GETATTR);
413         if (req == NULL)
414                 RETURN(ERR_PTR(-ENOMEM));
415
416         mdc_set_capa_size(req, &RMF_CAPA1, op_data->op_capa1);
417         req_capsule_set_size(&req->rq_pill, &RMF_NAME, RCL_CLIENT,
418                              op_data->op_namelen + 1);
419
420         rc = ldlm_prep_enqueue_req(exp, req, NULL, 0);
421         if (rc) {
422                 ptlrpc_request_free(req);
423                 RETURN(ERR_PTR(rc));
424         }
425
426         /* pack the intent */
427         lit = req_capsule_client_get(&req->rq_pill, &RMF_LDLM_INTENT);
428         lit->opc = (__u64)it->it_op;
429
430         /* pack the intended request */
431         mdc_getattr_pack(req, valid, it->it_flags, op_data,
432                          obddev->u.cli.cl_max_mds_easize);
433
434         req_capsule_set_size(&req->rq_pill, &RMF_MDT_MD, RCL_SERVER,
435                              obddev->u.cli.cl_max_mds_easize);
436         if (client_is_remote(exp))
437                 req_capsule_set_size(&req->rq_pill, &RMF_ACL, RCL_SERVER,
438                                      sizeof(struct mdt_remote_perm));
439         ptlrpc_request_set_replen(req);
440         RETURN(req);
441 }
442
443 static struct ptlrpc_request *ldlm_enqueue_pack(struct obd_export *exp)
444 {
445         struct ptlrpc_request *req;
446         int rc;
447         ENTRY;
448
449         req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_LDLM_ENQUEUE);
450         if (req == NULL)
451                 RETURN(ERR_PTR(-ENOMEM));
452
453         rc = ldlm_prep_enqueue_req(exp, req, NULL, 0);
454         if (rc) {
455                 ptlrpc_request_free(req);
456                 RETURN(ERR_PTR(rc));
457         }
458
459         ptlrpc_request_set_replen(req);
460         RETURN(req);
461 }
462
463 static int mdc_finish_enqueue(struct obd_export *exp,
464                               struct ptlrpc_request *req,
465                               struct ldlm_enqueue_info *einfo,
466                               struct lookup_intent *it,
467                               struct lustre_handle *lockh,
468                               int rc)
469 {
470         struct req_capsule  *pill = &req->rq_pill;
471         struct ldlm_request *lockreq;
472         struct ldlm_reply   *lockrep;
473         ENTRY;
474
475         LASSERT(rc >= 0);
476         /* Similarly, if we're going to replay this request, we don't want to
477          * actually get a lock, just perform the intent. */
478         if (req->rq_transno || req->rq_replay) {
479                 lockreq = req_capsule_client_get(pill, &RMF_DLM_REQ);
480                 lockreq->lock_flags |= LDLM_FL_INTENT_ONLY;
481         }
482
483         if (rc == ELDLM_LOCK_ABORTED) {
484                 einfo->ei_mode = 0;
485                 memset(lockh, 0, sizeof(*lockh));
486                 rc = 0;
487         } else { /* rc = 0 */
488                 struct ldlm_lock *lock = ldlm_handle2lock(lockh);
489                 LASSERT(lock);
490
491                 /* If the server gave us back a different lock mode, we should
492                  * fix up our variables. */
493                 if (lock->l_req_mode != einfo->ei_mode) {
494                         ldlm_lock_addref(lockh, lock->l_req_mode);
495                         ldlm_lock_decref(lockh, einfo->ei_mode);
496                         einfo->ei_mode = lock->l_req_mode;
497                 }
498                 LDLM_LOCK_PUT(lock);
499         }
500
501         lockrep = req_capsule_server_get(pill, &RMF_DLM_REP);
502         LASSERT(lockrep != NULL);                 /* checked by ldlm_cli_enqueue() */
503
504         it->d.lustre.it_disposition = (int)lockrep->lock_policy_res1;
505         it->d.lustre.it_status = (int)lockrep->lock_policy_res2;
506         it->d.lustre.it_lock_mode = einfo->ei_mode;
507         it->d.lustre.it_lock_handle = lockh->cookie;
508         it->d.lustre.it_data = req;
509
510         if (it->d.lustre.it_status < 0 && req->rq_replay)
511                 mdc_clear_replay_flag(req, it->d.lustre.it_status);
512
513         /* If we're doing an IT_OPEN which did not result in an actual
514          * successful open, then we need to remove the bit which saves
515          * this request for unconditional replay.
516          *
517          * It's important that we do this first!  Otherwise we might exit the
518          * function without doing so, and try to replay a failed create
519          * (bug 3440) */
520         if (it->it_op & IT_OPEN && req->rq_replay &&
521             (!it_disposition(it, DISP_OPEN_OPEN) ||it->d.lustre.it_status != 0))
522                 mdc_clear_replay_flag(req, it->d.lustre.it_status);
523
524         DEBUG_REQ(D_RPCTRACE, req, "op: %d disposition: %x, status: %d",
525                   it->it_op,it->d.lustre.it_disposition,it->d.lustre.it_status);
526
527         /* We know what to expect, so we do any byte flipping required here */
528         if (it->it_op & (IT_OPEN | IT_UNLINK | IT_LOOKUP | IT_GETATTR)) {
529                 struct mdt_body *body;
530
531                 body = req_capsule_server_get(pill, &RMF_MDT_BODY);
532                 if (body == NULL) {
533                         CERROR ("Can't swab mdt_body\n");
534                         RETURN (-EPROTO);
535                 }
536
537                 if (it_disposition(it, DISP_OPEN_OPEN) &&
538                     !it_open_error(DISP_OPEN_OPEN, it)) {
539                         /*
540                          * If this is a successful OPEN request, we need to set
541                          * replay handler and data early, so that if replay
542                          * happens immediately after swabbing below, new reply
543                          * is swabbed by that handler correctly.
544                          */
545                         mdc_set_open_replay_data(NULL, NULL, req);
546                 }
547
548                 if ((body->valid & (OBD_MD_FLDIREA | OBD_MD_FLEASIZE)) != 0) {
549                         void *eadata;
550
551                          mdc_update_max_ea_from_body(exp, body);
552
553                         /*
554                          * The eadata is opaque; just check that it is there.
555                          * Eventually, obd_unpackmd() will check the contents.
556                          */
557                         eadata = req_capsule_server_sized_get(pill, &RMF_MDT_MD,
558                                                               body->eadatasize);
559                         if (eadata == NULL)
560                                 RETURN(-EPROTO);
561
562                         /*
563                          * We save the reply LOV EA in case we have to replay a
564                          * create for recovery.  If we didn't allocate a large
565                          * enough request buffer above we need to reallocate it
566                          * here to hold the actual LOV EA.
567                          *
568                          * To not save LOV EA if request is not going to replay
569                          * (for example error one).
570                          */
571                         if ((it->it_op & IT_OPEN) && req->rq_replay) {
572                                 void *lmm;
573                                 if (req_capsule_get_size(pill, &RMF_EADATA,
574                                                          RCL_CLIENT) <
575                                     body->eadatasize)
576                                         mdc_realloc_openmsg(req, body);
577                                 else
578                                         req_capsule_shrink(pill, &RMF_EADATA,
579                                                            body->eadatasize,
580                                                            RCL_CLIENT);
581
582                                 req_capsule_set_size(pill, &RMF_EADATA,
583                                                      RCL_CLIENT,
584                                                      body->eadatasize);
585
586                                 lmm = req_capsule_client_get(pill, &RMF_EADATA);
587                                 if (lmm)
588                                         memcpy(lmm, eadata, body->eadatasize);
589                         }
590                 }
591
592                 if (body->valid & OBD_MD_FLRMTPERM) {
593                         struct mdt_remote_perm *perm;
594
595                         LASSERT(client_is_remote(exp));
596                         perm = req_capsule_server_swab_get(pill, &RMF_ACL,
597                                                 lustre_swab_mdt_remote_perm);
598                         if (perm == NULL)
599                                 RETURN(-EPROTO);
600                 }
601                 if (body->valid & OBD_MD_FLMDSCAPA) {
602                         struct lustre_capa *capa, *p;
603
604                         capa = req_capsule_server_get(pill, &RMF_CAPA1);
605                         if (capa == NULL)
606                                 RETURN(-EPROTO);
607
608                         if (it->it_op & IT_OPEN) {
609                                 /* client fid capa will be checked in replay */
610                                 p = req_capsule_client_get(pill, &RMF_CAPA2);
611                                 LASSERT(p);
612                                 *p = *capa;
613                         }
614                 }
615                 if (body->valid & OBD_MD_FLOSSCAPA) {
616                         struct lustre_capa *capa;
617
618                         capa = req_capsule_server_get(pill, &RMF_CAPA2);
619                         if (capa == NULL)
620                                 RETURN(-EPROTO);
621                 }
622         }
623
624         RETURN(rc);
625 }
626
627 /* We always reserve enough space in the reply packet for a stripe MD, because
628  * we don't know in advance the file type. */
629 int mdc_enqueue(struct obd_export *exp, struct ldlm_enqueue_info *einfo,
630                 struct lookup_intent *it, struct md_op_data *op_data,
631                 struct lustre_handle *lockh, void *lmm, int lmmsize,
632                 struct ptlrpc_request **reqp, int extra_lock_flags)
633 {
634         struct obd_device     *obddev = class_exp2obd(exp);
635         struct ptlrpc_request *req = NULL;
636         int                    flags, saved_flags = extra_lock_flags;
637         int                    rc;
638         struct ldlm_res_id res_id;
639         static const ldlm_policy_data_t lookup_policy =
640                             { .l_inodebits = { MDS_INODELOCK_LOOKUP } };
641         static const ldlm_policy_data_t update_policy =
642                             { .l_inodebits = { MDS_INODELOCK_UPDATE } };
643         ldlm_policy_data_t const *policy = &lookup_policy;
644         int                    generation, resends = 0;
645         struct ldlm_reply     *lockrep;
646         ENTRY;
647
648         LASSERTF(!it || einfo->ei_type == LDLM_IBITS, "lock type %d\n",
649                  einfo->ei_type);
650
651         fid_build_reg_res_name(&op_data->op_fid1, &res_id);
652
653         if (it)
654                 saved_flags |= LDLM_FL_HAS_INTENT;
655         if (it && it->it_op & (IT_UNLINK | IT_GETATTR | IT_READDIR))
656                 policy = &update_policy;
657
658         LASSERT(reqp == NULL);
659
660         generation = obddev->u.cli.cl_import->imp_generation;
661 resend:
662         flags = saved_flags;
663         if (!it) {
664                 /* The only way right now is FLOCK, in this case we hide flock
665                    policy as lmm, but lmmsize is 0 */
666                 LASSERT(lmm && lmmsize == 0);
667                 LASSERTF(einfo->ei_type == LDLM_FLOCK, "lock type %d\n",
668                          einfo->ei_type);
669                 policy = (ldlm_policy_data_t *)lmm;
670                 res_id.name[3] = LDLM_FLOCK;
671         } else if (it->it_op & IT_OPEN) {
672                 req = mdc_intent_open_pack(exp, it, op_data, lmm, lmmsize,
673                                            einfo->ei_cbdata);
674                 policy = &update_policy;
675                 einfo->ei_cbdata = NULL;
676                 lmm = NULL;
677         } else if (it->it_op & IT_UNLINK)
678                 req = mdc_intent_unlink_pack(exp, it, op_data);
679         else if (it->it_op & (IT_GETATTR | IT_LOOKUP | IT_LAYOUT))
680                 req = mdc_intent_getattr_pack(exp, it, op_data);
681         else if (it->it_op == IT_READDIR)
682                 req = ldlm_enqueue_pack(exp);
683         else {
684                 LBUG();
685                 RETURN(-EINVAL);
686         }
687
688         if (IS_ERR(req))
689                 RETURN(PTR_ERR(req));
690
691         if (resends) {
692                 req->rq_generation_set = 1;
693                 req->rq_import_generation = generation;
694                 req->rq_sent = cfs_time_current_sec() + resends;
695         }
696
697         /* It is important to obtain rpc_lock first (if applicable), so that
698          * threads that are serialised with rpc_lock are not polluting our
699          * rpcs in flight counter. We do not do flock request limiting, though*/
700         if (it) {
701                 mdc_get_rpc_lock(obddev->u.cli.cl_rpc_lock, it);
702                 rc = mdc_enter_request(&obddev->u.cli);
703                 if (rc != 0) {
704                         mdc_put_rpc_lock(obddev->u.cli.cl_rpc_lock, it);
705                         mdc_clear_replay_flag(req, 0);
706                         ptlrpc_req_finished(req);
707                         RETURN(rc);
708                 }
709         }
710
711         rc = ldlm_cli_enqueue(exp, &req, einfo, &res_id, policy, &flags, NULL,
712                               0, lockh, 0);
713         if (!it) {
714                 /* For flock requests we immediatelly return without further
715                    delay and let caller deal with the rest, since rest of
716                    this function metadata processing makes no sense for flock
717                    requests anyway */
718                 RETURN(rc);
719         }
720
721         mdc_exit_request(&obddev->u.cli);
722         mdc_put_rpc_lock(obddev->u.cli.cl_rpc_lock, it);
723
724         if (rc < 0) {
725                 CERROR("ldlm_cli_enqueue: %d\n", rc);
726                 mdc_clear_replay_flag(req, rc);
727                 ptlrpc_req_finished(req);
728                 RETURN(rc);
729         }
730
731         lockrep = req_capsule_server_get(&req->rq_pill, &RMF_DLM_REP);
732         LASSERT(lockrep != NULL);
733
734         /* Retry the create infinitely when we get -EINPROGRESS from
735          * server. This is required by the new quota design. */
736         if (it && it->it_op & IT_CREAT &&
737             (int)lockrep->lock_policy_res2 == -EINPROGRESS) {
738                 mdc_clear_replay_flag(req, rc);
739                 ptlrpc_req_finished(req);
740                 resends++;
741
742                 CDEBUG(D_HA, "%s: resend:%d op:%d "DFID"/"DFID"\n",
743                        obddev->obd_name, resends, it->it_op,
744                        PFID(&op_data->op_fid1), PFID(&op_data->op_fid2));
745
746                 if (generation == obddev->u.cli.cl_import->imp_generation) {
747                         goto resend;
748                 } else {
749                         CDEBUG(D_HA, "resned cross eviction\n");
750                         RETURN(-EIO);
751                 }
752         }
753
754         rc = mdc_finish_enqueue(exp, req, einfo, it, lockh, rc);
755
756         RETURN(rc);
757 }
758
759 static int mdc_finish_intent_lock(struct obd_export *exp,
760                                   struct ptlrpc_request *request,
761                                   struct md_op_data *op_data,
762                                   struct lookup_intent *it,
763                                   struct lustre_handle *lockh)
764 {
765         struct lustre_handle old_lock;
766         struct mdt_body *mdt_body;
767         struct ldlm_lock *lock;
768         int rc;
769
770
771         LASSERT(request != NULL);
772         LASSERT(request != LP_POISON);
773         LASSERT(request->rq_repmsg != LP_POISON);
774
775         if (!it_disposition(it, DISP_IT_EXECD)) {
776                 /* The server failed before it even started executing the
777                  * intent, i.e. because it couldn't unpack the request. */
778                 LASSERT(it->d.lustre.it_status != 0);
779                 RETURN(it->d.lustre.it_status);
780         }
781         rc = it_open_error(DISP_IT_EXECD, it);
782         if (rc)
783                 RETURN(rc);
784
785         mdt_body = req_capsule_server_get(&request->rq_pill, &RMF_MDT_BODY);
786         LASSERT(mdt_body != NULL);      /* mdc_enqueue checked */
787
788         /* If we were revalidating a fid/name pair, mark the intent in
789          * case we fail and get called again from lookup */
790         if (fid_is_sane(&op_data->op_fid2) &&
791             it->it_create_mode & M_CHECK_STALE &&
792             it->it_op != IT_GETATTR) {
793                 it_set_disposition(it, DISP_ENQ_COMPLETE);
794
795                 /* Also: did we find the same inode? */
796                 /* sever can return one of two fids:
797                  * op_fid2 - new allocated fid - if file is created.
798                  * op_fid3 - existent fid - if file only open.
799                  * op_fid3 is saved in lmv_intent_open */
800                 if ((!lu_fid_eq(&op_data->op_fid2, &mdt_body->fid1)) &&
801                     (!lu_fid_eq(&op_data->op_fid3, &mdt_body->fid1))) {
802                         CDEBUG(D_DENTRY, "Found stale data "DFID"("DFID")/"DFID
803                                "\n", PFID(&op_data->op_fid2),
804                                PFID(&op_data->op_fid2), PFID(&mdt_body->fid1));
805                         RETURN(-ESTALE);
806                 }
807         }
808
809         rc = it_open_error(DISP_LOOKUP_EXECD, it);
810         if (rc)
811                 RETURN(rc);
812
813         /* keep requests around for the multiple phases of the call
814          * this shows the DISP_XX must guarantee we make it into the call
815          */
816         if (!it_disposition(it, DISP_ENQ_CREATE_REF) &&
817             it_disposition(it, DISP_OPEN_CREATE) &&
818             !it_open_error(DISP_OPEN_CREATE, it)) {
819                 it_set_disposition(it, DISP_ENQ_CREATE_REF);
820                 ptlrpc_request_addref(request); /* balanced in ll_create_node */
821         }
822         if (!it_disposition(it, DISP_ENQ_OPEN_REF) &&
823             it_disposition(it, DISP_OPEN_OPEN) &&
824             !it_open_error(DISP_OPEN_OPEN, it)) {
825                 it_set_disposition(it, DISP_ENQ_OPEN_REF);
826                 ptlrpc_request_addref(request); /* balanced in ll_file_open */
827                 /* BUG 11546 - eviction in the middle of open rpc processing */
828                 OBD_FAIL_TIMEOUT(OBD_FAIL_MDC_ENQUEUE_PAUSE, obd_timeout);
829         }
830
831         if (it->it_op & IT_CREAT) {
832                 /* XXX this belongs in ll_create_it */
833         } else if (it->it_op == IT_OPEN) {
834                 LASSERT(!it_disposition(it, DISP_OPEN_CREATE));
835         } else {
836                 LASSERT(it->it_op & (IT_GETATTR | IT_LOOKUP | IT_LAYOUT));
837         }
838
839         /* If we already have a matching lock, then cancel the new
840          * one.  We have to set the data here instead of in
841          * mdc_enqueue, because we need to use the child's inode as
842          * the l_ast_data to match, and that's not available until
843          * intent_finish has performed the iget().) */
844         lock = ldlm_handle2lock(lockh);
845         if (lock) {
846                 ldlm_policy_data_t policy = lock->l_policy_data;
847                 LDLM_DEBUG(lock, "matching against this");
848
849                 LASSERTF(fid_res_name_eq(&mdt_body->fid1,
850                                          &lock->l_resource->lr_name),
851                          "Lock res_id: %lu/%lu/%lu, fid: %lu/%lu/%lu.\n",
852                          (unsigned long)lock->l_resource->lr_name.name[0],
853                          (unsigned long)lock->l_resource->lr_name.name[1],
854                          (unsigned long)lock->l_resource->lr_name.name[2],
855                          (unsigned long)fid_seq(&mdt_body->fid1),
856                          (unsigned long)fid_oid(&mdt_body->fid1),
857                          (unsigned long)fid_ver(&mdt_body->fid1));
858                 LDLM_LOCK_PUT(lock);
859
860                 memcpy(&old_lock, lockh, sizeof(*lockh));
861                 if (ldlm_lock_match(NULL, LDLM_FL_BLOCK_GRANTED, NULL,
862                                     LDLM_IBITS, &policy, LCK_NL, &old_lock, 0)) {
863                         ldlm_lock_decref_and_cancel(lockh,
864                                                     it->d.lustre.it_lock_mode);
865                         memcpy(lockh, &old_lock, sizeof(old_lock));
866                         it->d.lustre.it_lock_handle = lockh->cookie;
867                 }
868         }
869         CDEBUG(D_DENTRY,"D_IT dentry %.*s intent: %s status %d disp %x rc %d\n",
870                op_data->op_namelen, op_data->op_name, ldlm_it2str(it->it_op),
871                it->d.lustre.it_status, it->d.lustre.it_disposition, rc);
872         RETURN(rc);
873 }
874
875 int mdc_revalidate_lock(struct obd_export *exp, struct lookup_intent *it,
876                         struct lu_fid *fid, __u64 *bits)
877 {
878         /* We could just return 1 immediately, but since we should only
879          * be called in revalidate_it if we already have a lock, let's
880          * verify that. */
881         struct ldlm_res_id res_id;
882         struct lustre_handle lockh;
883         ldlm_policy_data_t policy;
884         ldlm_mode_t mode;
885         ENTRY;
886
887         if (it->d.lustre.it_lock_handle) {
888                 lockh.cookie = it->d.lustre.it_lock_handle;
889                 mode = ldlm_revalidate_lock_handle(&lockh, bits);
890         } else {
891                 fid_build_reg_res_name(fid, &res_id);
892                 switch (it->it_op) {
893                 case IT_GETATTR:
894                         policy.l_inodebits.bits = MDS_INODELOCK_UPDATE;
895                         break;
896                 case IT_LAYOUT:
897                         policy.l_inodebits.bits = MDS_INODELOCK_LAYOUT;
898                         break;
899                 default:
900                         policy.l_inodebits.bits = MDS_INODELOCK_LOOKUP;
901                         break;
902                 }
903                 mode = ldlm_lock_match(exp->exp_obd->obd_namespace,
904                                        LDLM_FL_BLOCK_GRANTED, &res_id,
905                                        LDLM_IBITS, &policy,
906                                        LCK_CR|LCK_CW|LCK_PR|LCK_PW, &lockh, 0);
907         }
908
909         if (mode) {
910                 it->d.lustre.it_lock_handle = lockh.cookie;
911                 it->d.lustre.it_lock_mode = mode;
912         } else {
913                 it->d.lustre.it_lock_handle = 0;
914                 it->d.lustre.it_lock_mode = 0;
915         }
916
917         RETURN(!!mode);
918 }
919
920 /*
921  * This long block is all about fixing up the lock and request state
922  * so that it is correct as of the moment _before_ the operation was
923  * applied; that way, the VFS will think that everything is normal and
924  * call Lustre's regular VFS methods.
925  *
926  * If we're performing a creation, that means that unless the creation
927  * failed with EEXIST, we should fake up a negative dentry.
928  *
929  * For everything else, we want to lookup to succeed.
930  *
931  * One additional note: if CREATE or OPEN succeeded, we add an extra
932  * reference to the request because we need to keep it around until
933  * ll_create/ll_open gets called.
934  *
935  * The server will return to us, in it_disposition, an indication of
936  * exactly what d.lustre.it_status refers to.
937  *
938  * If DISP_OPEN_OPEN is set, then d.lustre.it_status refers to the open() call,
939  * otherwise if DISP_OPEN_CREATE is set, then it status is the
940  * creation failure mode.  In either case, one of DISP_LOOKUP_NEG or
941  * DISP_LOOKUP_POS will be set, indicating whether the child lookup
942  * was successful.
943  *
944  * Else, if DISP_LOOKUP_EXECD then d.lustre.it_status is the rc of the
945  * child lookup.
946  */
947 int mdc_intent_lock(struct obd_export *exp, struct md_op_data *op_data,
948                     void *lmm, int lmmsize, struct lookup_intent *it,
949                     int lookup_flags, struct ptlrpc_request **reqp,
950                     ldlm_blocking_callback cb_blocking,
951                     int extra_lock_flags)
952 {
953         struct lustre_handle lockh;
954         int rc = 0;
955         ENTRY;
956         LASSERT(it);
957
958         CDEBUG(D_DLMTRACE, "(name: %.*s,"DFID") in obj "DFID
959                ", intent: %s flags %#o\n", op_data->op_namelen,
960                op_data->op_name, PFID(&op_data->op_fid2),
961                PFID(&op_data->op_fid1), ldlm_it2str(it->it_op),
962                it->it_flags);
963
964         lockh.cookie = 0;
965         if (fid_is_sane(&op_data->op_fid2) &&
966             (it->it_op & (IT_LOOKUP | IT_GETATTR | IT_LAYOUT))) {
967                 /* We could just return 1 immediately, but since we should only
968                  * be called in revalidate_it if we already have a lock, let's
969                  * verify that. */
970                 it->d.lustre.it_lock_handle = 0;
971                 rc = mdc_revalidate_lock(exp, it, &op_data->op_fid2, NULL);
972                 /* Only return failure if it was not GETATTR by cfid
973                    (from inode_revalidate) */
974                 if (rc || op_data->op_namelen != 0)
975                         RETURN(rc);
976         }
977
978         /* lookup_it may be called only after revalidate_it has run, because
979          * revalidate_it cannot return errors, only zero.  Returning zero causes
980          * this call to lookup, which *can* return an error.
981          *
982          * We only want to execute the request associated with the intent one
983          * time, however, so don't send the request again.  Instead, skip past
984          * this and use the request from revalidate.  In this case, revalidate
985          * never dropped its reference, so the refcounts are all OK */
986         if (!it_disposition(it, DISP_ENQ_COMPLETE)) {
987                 struct ldlm_enqueue_info einfo =
988                         { LDLM_IBITS, it_to_lock_mode(it), cb_blocking,
989                           ldlm_completion_ast, NULL, NULL, NULL };
990
991                 /* For case if upper layer did not alloc fid, do it now. */
992                 if (!fid_is_sane(&op_data->op_fid2) && it->it_op & IT_CREAT) {
993                         rc = mdc_fid_alloc(exp, &op_data->op_fid2, op_data);
994                         if (rc < 0) {
995                                 CERROR("Can't alloc new fid, rc %d\n", rc);
996                                 RETURN(rc);
997                         }
998                 }
999                 rc = mdc_enqueue(exp, &einfo, it, op_data, &lockh,
1000                                  lmm, lmmsize, NULL, extra_lock_flags);
1001                 if (rc < 0)
1002                         RETURN(rc);
1003         } else if (!fid_is_sane(&op_data->op_fid2) ||
1004                    !(it->it_create_mode & M_CHECK_STALE)) {
1005                 /* DISP_ENQ_COMPLETE set means there is extra reference on
1006                  * request referenced from this intent, saved for subsequent
1007                  * lookup.  This path is executed when we proceed to this
1008                  * lookup, so we clear DISP_ENQ_COMPLETE */
1009                 it_clear_disposition(it, DISP_ENQ_COMPLETE);
1010         }
1011         *reqp = it->d.lustre.it_data;
1012         rc = mdc_finish_intent_lock(exp, *reqp, op_data, it, &lockh);
1013         RETURN(rc);
1014 }
1015
1016 static int mdc_intent_getattr_async_interpret(const struct lu_env *env,
1017                                               struct ptlrpc_request *req,
1018                                               void *args, int rc)
1019 {
1020         struct mdc_getattr_args  *ga = args;
1021         struct obd_export        *exp = ga->ga_exp;
1022         struct md_enqueue_info   *minfo = ga->ga_minfo;
1023         struct ldlm_enqueue_info *einfo = ga->ga_einfo;
1024         struct lookup_intent     *it;
1025         struct lustre_handle     *lockh;
1026         struct obd_device        *obddev;
1027         int                       flags = LDLM_FL_HAS_INTENT;
1028         ENTRY;
1029
1030         it    = &minfo->mi_it;
1031         lockh = &minfo->mi_lockh;
1032
1033         obddev = class_exp2obd(exp);
1034
1035         mdc_exit_request(&obddev->u.cli);
1036         if (OBD_FAIL_CHECK(OBD_FAIL_MDC_GETATTR_ENQUEUE))
1037                 rc = -ETIMEDOUT;
1038
1039         rc = ldlm_cli_enqueue_fini(exp, req, einfo->ei_type, 1, einfo->ei_mode,
1040                                    &flags, NULL, 0, lockh, rc);
1041         if (rc < 0) {
1042                 CERROR("ldlm_cli_enqueue_fini: %d\n", rc);
1043                 mdc_clear_replay_flag(req, rc);
1044                 GOTO(out, rc);
1045         }
1046
1047         rc = mdc_finish_enqueue(exp, req, einfo, it, lockh, rc);
1048         if (rc)
1049                 GOTO(out, rc);
1050
1051         rc = mdc_finish_intent_lock(exp, req, &minfo->mi_data, it, lockh);
1052         EXIT;
1053
1054 out:
1055         OBD_FREE_PTR(einfo);
1056         minfo->mi_cb(req, minfo, rc);
1057         return 0;
1058 }
1059
1060 int mdc_intent_getattr_async(struct obd_export *exp,
1061                              struct md_enqueue_info *minfo,
1062                              struct ldlm_enqueue_info *einfo)
1063 {
1064         struct md_op_data       *op_data = &minfo->mi_data;
1065         struct lookup_intent    *it = &minfo->mi_it;
1066         struct ptlrpc_request   *req;
1067         struct mdc_getattr_args *ga;
1068         struct obd_device       *obddev = class_exp2obd(exp);
1069         struct ldlm_res_id       res_id;
1070         /*XXX: Both MDS_INODELOCK_LOOKUP and MDS_INODELOCK_UPDATE are needed
1071          *     for statahead currently. Consider CMD in future, such two bits
1072          *     maybe managed by different MDS, should be adjusted then. */
1073         ldlm_policy_data_t       policy = {
1074                                         .l_inodebits = { MDS_INODELOCK_LOOKUP | 
1075                                                          MDS_INODELOCK_UPDATE }
1076                                  };
1077         int                      rc = 0;
1078         int                      flags = LDLM_FL_HAS_INTENT;
1079         ENTRY;
1080
1081         CDEBUG(D_DLMTRACE,"name: %.*s in inode "DFID", intent: %s flags %#o\n",
1082                op_data->op_namelen, op_data->op_name, PFID(&op_data->op_fid1),
1083                ldlm_it2str(it->it_op), it->it_flags);
1084
1085         fid_build_reg_res_name(&op_data->op_fid1, &res_id);
1086         req = mdc_intent_getattr_pack(exp, it, op_data);
1087         if (!req)
1088                 RETURN(-ENOMEM);
1089
1090         rc = mdc_enter_request(&obddev->u.cli);
1091         if (rc != 0) {
1092                 ptlrpc_req_finished(req);
1093                 RETURN(rc);
1094         }
1095
1096         rc = ldlm_cli_enqueue(exp, &req, einfo, &res_id, &policy, &flags, NULL,
1097                               0, &minfo->mi_lockh, 1);
1098         if (rc < 0) {
1099                 mdc_exit_request(&obddev->u.cli);
1100                 ptlrpc_req_finished(req);
1101                 RETURN(rc);
1102         }
1103
1104         CLASSERT(sizeof(*ga) <= sizeof(req->rq_async_args));
1105         ga = ptlrpc_req_async_args(req);
1106         ga->ga_exp = exp;
1107         ga->ga_minfo = minfo;
1108         ga->ga_einfo = einfo;
1109
1110         req->rq_interpret_reply = mdc_intent_getattr_async_interpret;
1111         ptlrpcd_add_req(req, PDL_POLICY_LOCAL, -1);
1112
1113         RETURN(0);
1114 }