Whamcloud - gitweb
LU-12321 mdc: allow ELC for DOM file unlink
[fs/lustre-release.git] / lustre / mdc / mdc_reint.c
1 /*
2  * GPL HEADER START
3  *
4  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
5  *
6  * This program is free software; you can redistribute it and/or modify
7  * it under the terms of the GNU General Public License version 2 only,
8  * as published by the Free Software Foundation.
9  *
10  * This program is distributed in the hope that it will be useful, but
11  * WITHOUT ANY WARRANTY; without even the implied warranty of
12  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13  * General Public License version 2 for more details (a copy is included
14  * in the LICENSE file that accompanied this code).
15  *
16  * You should have received a copy of the GNU General Public License
17  * version 2 along with this program; If not, see
18  * http://www.gnu.org/licenses/gpl-2.0.html
19  *
20  * GPL HEADER END
21  */
22 /*
23  * Copyright (c) 2002, 2010, Oracle and/or its affiliates. All rights reserved.
24  * Use is subject to license terms.
25  *
26  * Copyright (c) 2012, 2017, Intel Corporation.
27  */
28 /*
29  * This file is part of Lustre, http://www.lustre.org/
30  * Lustre is a trademark of Sun Microsystems, Inc.
31  */
32
33 #define DEBUG_SUBSYSTEM S_MDC
34
35 #include <linux/module.h>
36 #include <linux/kernel.h>
37
38 #include <obd_class.h>
39 #include "mdc_internal.h"
40 #include <lustre_fid.h>
41
42 /* mdc_setattr does its own semaphore handling */
43 static int mdc_reint(struct ptlrpc_request *request, int level)
44 {
45         int rc;
46
47         request->rq_send_state = level;
48
49         ptlrpc_get_mod_rpc_slot(request);
50         rc = ptlrpc_queue_wait(request);
51         ptlrpc_put_mod_rpc_slot(request);
52         if (rc)
53                 CDEBUG(D_INFO, "error in handling %d\n", rc);
54         else if (!req_capsule_server_get(&request->rq_pill, &RMF_MDT_BODY)) {
55                 rc = -EPROTO;
56         }
57         return rc;
58 }
59
60 /* Find and cancel locally locks matched by inode @bits & @mode in the resource
61  * found by @fid. Found locks are added into @cancel list. Returns the amount of
62  * locks added to @cancels list. */
63 int mdc_resource_get_unused_res(struct obd_export *exp,
64                                 struct ldlm_res_id *res_id,
65                                 struct list_head *cancels,
66                                 enum ldlm_mode mode, __u64 bits)
67 {
68         struct ldlm_namespace *ns = exp->exp_obd->obd_namespace;
69         union ldlm_policy_data policy = { { 0 } };
70         struct ldlm_resource *res;
71         int count;
72
73         ENTRY;
74
75         /* Return, i.e. cancel nothing, only if ELC is supported (flag in
76          * export) but disabled through procfs (flag in NS).
77          *
78          * This distinguishes from a case when ELC is not supported originally,
79          * when we still want to cancel locks in advance and just cancel them
80          * locally, without sending any RPC. */
81         if (exp_connect_cancelset(exp) && !ns_connect_cancelset(ns))
82                 RETURN(0);
83
84         res = ldlm_resource_get(ns, NULL, res_id, 0, 0);
85         if (IS_ERR(res))
86                 RETURN(0);
87         LDLM_RESOURCE_ADDREF(res);
88         /* Initialize ibits lock policy. */
89         policy.l_inodebits.bits = bits;
90         count = ldlm_cancel_resource_local(res, cancels, &policy, mode, 0, 0,
91                                            NULL);
92         LDLM_RESOURCE_DELREF(res);
93         ldlm_resource_putref(res);
94         RETURN(count);
95 }
96
97 int mdc_resource_get_unused(struct obd_export *exp, const struct lu_fid *fid,
98                             struct list_head *cancels, enum ldlm_mode mode,
99                             __u64 bits)
100 {
101         struct ldlm_res_id res_id;
102
103         fid_build_reg_res_name(fid, &res_id);
104         return mdc_resource_get_unused_res(exp, &res_id, cancels, mode, bits);
105 }
106
107 int mdc_setattr(struct obd_export *exp, struct md_op_data *op_data,
108                 void *ea, size_t ealen, struct ptlrpc_request **request)
109 {
110         LIST_HEAD(cancels);
111         struct ptlrpc_request *req;
112         int count = 0, rc;
113         __u64 bits;
114         ENTRY;
115
116         LASSERT(op_data != NULL);
117
118         bits = MDS_INODELOCK_UPDATE;
119         if (op_data->op_attr.ia_valid & (ATTR_MODE|ATTR_UID|ATTR_GID))
120                 bits |= MDS_INODELOCK_LOOKUP;
121         if ((op_data->op_flags & MF_MDC_CANCEL_FID1) &&
122             (fid_is_sane(&op_data->op_fid1)))
123                 count = mdc_resource_get_unused(exp, &op_data->op_fid1,
124                                                 &cancels, LCK_EX, bits);
125         req = ptlrpc_request_alloc(class_exp2cliimp(exp),
126                                    &RQF_MDS_REINT_SETATTR);
127         if (req == NULL) {
128                 ldlm_lock_list_put(&cancels, l_bl_ast, count);
129                 RETURN(-ENOMEM);
130         }
131
132         req_capsule_set_size(&req->rq_pill, &RMF_MDT_EPOCH, RCL_CLIENT, 0);
133         req_capsule_set_size(&req->rq_pill, &RMF_EADATA, RCL_CLIENT, ealen);
134         req_capsule_set_size(&req->rq_pill, &RMF_LOGCOOKIES, RCL_CLIENT, 0);
135
136         rc = mdc_prep_elc_req(exp, req, MDS_REINT, &cancels, count);
137         if (rc) {
138                 ptlrpc_request_free(req);
139                 RETURN(rc);
140         }
141
142         if (op_data->op_attr.ia_valid & (ATTR_MTIME | ATTR_CTIME))
143                 CDEBUG(D_INODE, "setting mtime %lld, ctime %lld\n",
144                        (s64)op_data->op_attr.ia_mtime.tv_sec,
145                        (s64)op_data->op_attr.ia_ctime.tv_sec);
146         mdc_setattr_pack(req, op_data, ea, ealen);
147
148         req_capsule_set_size(&req->rq_pill, &RMF_ACL, RCL_SERVER, 0);
149
150         ptlrpc_request_set_replen(req);
151
152         rc = mdc_reint(req, LUSTRE_IMP_FULL);
153         if (rc == -ERESTARTSYS)
154                 rc = 0;
155
156         *request = req;
157
158         RETURN(rc);
159 }
160
161 int mdc_create(struct obd_export *exp, struct md_op_data *op_data,
162                 const void *data, size_t datalen,
163                 umode_t mode, uid_t uid, gid_t gid,
164                 cfs_cap_t cap_effective, __u64 rdev,
165                 struct ptlrpc_request **request)
166 {
167         struct ptlrpc_request *req;
168         int level, rc;
169         int count, resends = 0;
170         struct obd_import *import = exp->exp_obd->u.cli.cl_import;
171         int generation = import->imp_generation;
172         LIST_HEAD(cancels);
173         ENTRY;
174
175         /* For case if upper layer did not alloc fid, do it now. */
176         if (!fid_is_sane(&op_data->op_fid2)) {
177                 /*
178                  * mdc_fid_alloc() may return errno 1 in case of switch to new
179                  * sequence, handle this.
180                  */
181                 rc = mdc_fid_alloc(NULL, exp, &op_data->op_fid2, op_data);
182                 if (rc < 0)
183                         RETURN(rc);
184         }
185
186 rebuild:
187         count = 0;
188         if ((op_data->op_flags & MF_MDC_CANCEL_FID1) &&
189             (fid_is_sane(&op_data->op_fid1)))
190                 count = mdc_resource_get_unused(exp, &op_data->op_fid1,
191                                                 &cancels, LCK_EX,
192                                                 MDS_INODELOCK_UPDATE);
193
194         req = ptlrpc_request_alloc(class_exp2cliimp(exp),
195                                    &RQF_MDS_REINT_CREATE_ACL);
196         if (req == NULL) {
197                 ldlm_lock_list_put(&cancels, l_bl_ast, count);
198                 RETURN(-ENOMEM);
199         }
200
201         req_capsule_set_size(&req->rq_pill, &RMF_NAME, RCL_CLIENT,
202                              op_data->op_namelen + 1);
203         req_capsule_set_size(&req->rq_pill, &RMF_EADATA, RCL_CLIENT,
204                              data && datalen ? datalen : 0);
205
206         req_capsule_set_size(&req->rq_pill, &RMF_FILE_SECCTX_NAME,
207                              RCL_CLIENT, op_data->op_file_secctx_name != NULL ?
208                              strlen(op_data->op_file_secctx_name) + 1 : 0);
209
210         req_capsule_set_size(&req->rq_pill, &RMF_FILE_SECCTX, RCL_CLIENT,
211                              op_data->op_file_secctx_size);
212
213         /* get SELinux policy info if any */
214         rc = sptlrpc_get_sepol(req);
215         if (rc < 0) {
216                 ptlrpc_request_free(req);
217                 RETURN(rc);
218         }
219         req_capsule_set_size(&req->rq_pill, &RMF_SELINUX_POL, RCL_CLIENT,
220                              strlen(req->rq_sepol) ?
221                              strlen(req->rq_sepol) + 1 : 0);
222
223         rc = mdc_prep_elc_req(exp, req, MDS_REINT, &cancels, count);
224         if (rc) {
225                 ptlrpc_request_free(req);
226                 RETURN(rc);
227         }
228
229         /*
230          * mdc_create_pack() fills msg->bufs[1] with name and msg->bufs[2] with
231          * tgt, for symlinks or lov MD data.
232          */
233         mdc_create_pack(req, op_data, data, datalen, mode, uid,
234                         gid, cap_effective, rdev);
235
236         ptlrpc_request_set_replen(req);
237
238         /* ask ptlrpc not to resend on EINPROGRESS since we have our own retry
239          * logic here */
240         req->rq_no_retry_einprogress = 1;
241
242         if (resends) {
243                 req->rq_generation_set = 1;
244                 req->rq_import_generation = generation;
245                 req->rq_sent = ktime_get_real_seconds() + resends;
246         }
247         level = LUSTRE_IMP_FULL;
248  resend:
249         rc = mdc_reint(req, level);
250
251         /* Resend if we were told to. */
252         if (rc == -ERESTARTSYS) {
253                 level = LUSTRE_IMP_RECOVER;
254                 goto resend;
255         } else if (rc == -EINPROGRESS) {
256                 /* Retry create infinitely until succeed or get other
257                  * error code or interrupted. */
258                 ptlrpc_req_finished(req);
259                 if (generation == import->imp_generation) {
260                         if (signal_pending(current))
261                                 RETURN(-EINTR);
262
263                         resends++;
264                         CDEBUG(D_HA, "%s: resend:%d create on "DFID"/"DFID"\n",
265                                exp->exp_obd->obd_name, resends,
266                                PFID(&op_data->op_fid1),
267                                PFID(&op_data->op_fid2));
268                         goto rebuild;
269                 } else {
270                         CDEBUG(D_HA, "resend cross eviction\n");
271                         RETURN(-EIO);
272                 }
273         }
274
275         *request = req;
276         RETURN(rc);
277 }
278
279 int mdc_unlink(struct obd_export *exp, struct md_op_data *op_data,
280                struct ptlrpc_request **request)
281 {
282         LIST_HEAD(cancels);
283         struct obd_device *obd = class_exp2obd(exp);
284         struct ptlrpc_request *req = *request;
285         int count = 0, rc;
286         ENTRY;
287
288         LASSERT(req == NULL);
289
290         if ((op_data->op_flags & MF_MDC_CANCEL_FID1) &&
291             (fid_is_sane(&op_data->op_fid1)))
292                 count = mdc_resource_get_unused(exp, &op_data->op_fid1,
293                                                 &cancels, LCK_EX,
294                                                 MDS_INODELOCK_UPDATE);
295         if ((op_data->op_flags & MF_MDC_CANCEL_FID3) &&
296             (fid_is_sane(&op_data->op_fid3)))
297                 /* cancel DOM lock only if it has no data to flush */
298                 count += mdc_resource_get_unused(exp, &op_data->op_fid3,
299                                                  &cancels, LCK_EX,
300                                                  op_data->op_cli_flags &
301                                                  CLI_DIRTY_DATA ?
302                                                  MDS_INODELOCK_ELC :
303                                                  MDS_INODELOCK_FULL);
304         req = ptlrpc_request_alloc(class_exp2cliimp(exp),
305                                    &RQF_MDS_REINT_UNLINK);
306         if (req == NULL) {
307                 ldlm_lock_list_put(&cancels, l_bl_ast, count);
308                 RETURN(-ENOMEM);
309         }
310
311         req_capsule_set_size(&req->rq_pill, &RMF_NAME, RCL_CLIENT,
312                              op_data->op_namelen + 1);
313
314         /* get SELinux policy info if any */
315         rc = sptlrpc_get_sepol(req);
316         if (rc < 0) {
317                 ptlrpc_request_free(req);
318                 RETURN(rc);
319         }
320         req_capsule_set_size(&req->rq_pill, &RMF_SELINUX_POL, RCL_CLIENT,
321                              strlen(req->rq_sepol) ?
322                              strlen(req->rq_sepol) + 1 : 0);
323
324         rc = mdc_prep_elc_req(exp, req, MDS_REINT, &cancels, count);
325         if (rc) {
326                 ptlrpc_request_free(req);
327                 RETURN(rc);
328         }
329
330         mdc_unlink_pack(req, op_data);
331
332         req_capsule_set_size(&req->rq_pill, &RMF_MDT_MD, RCL_SERVER,
333                              obd->u.cli.cl_default_mds_easize);
334         ptlrpc_request_set_replen(req);
335
336         *request = req;
337
338         rc = mdc_reint(req, LUSTRE_IMP_FULL);
339         if (rc == -ERESTARTSYS)
340                 rc = 0;
341         RETURN(rc);
342 }
343
344 int mdc_link(struct obd_export *exp, struct md_op_data *op_data,
345              struct ptlrpc_request **request)
346 {
347         LIST_HEAD(cancels);
348         struct ptlrpc_request *req;
349         int count = 0, rc;
350         ENTRY;
351
352         if ((op_data->op_flags & MF_MDC_CANCEL_FID2) &&
353             (fid_is_sane(&op_data->op_fid2)))
354                 count = mdc_resource_get_unused(exp, &op_data->op_fid2,
355                                                 &cancels, LCK_EX,
356                                                 MDS_INODELOCK_UPDATE);
357         if ((op_data->op_flags & MF_MDC_CANCEL_FID1) &&
358             (fid_is_sane(&op_data->op_fid1)))
359                 count += mdc_resource_get_unused(exp, &op_data->op_fid1,
360                                                  &cancels, LCK_EX,
361                                                  MDS_INODELOCK_UPDATE);
362
363         req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_MDS_REINT_LINK);
364         if (req == NULL) {
365                 ldlm_lock_list_put(&cancels, l_bl_ast, count);
366                 RETURN(-ENOMEM);
367         }
368
369         req_capsule_set_size(&req->rq_pill, &RMF_NAME, RCL_CLIENT,
370                              op_data->op_namelen + 1);
371
372         /* get SELinux policy info if any */
373         rc = sptlrpc_get_sepol(req);
374         if (rc < 0) {
375                 ptlrpc_request_free(req);
376                 RETURN(rc);
377         }
378         req_capsule_set_size(&req->rq_pill, &RMF_SELINUX_POL, RCL_CLIENT,
379                              strlen(req->rq_sepol) ?
380                              strlen(req->rq_sepol) + 1 : 0);
381
382         rc = mdc_prep_elc_req(exp, req, MDS_REINT, &cancels, count);
383         if (rc) {
384                 ptlrpc_request_free(req);
385                 RETURN(rc);
386         }
387
388         mdc_link_pack(req, op_data);
389         ptlrpc_request_set_replen(req);
390
391         rc = mdc_reint(req, LUSTRE_IMP_FULL);
392         *request = req;
393         if (rc == -ERESTARTSYS)
394                 rc = 0;
395
396         RETURN(rc);
397 }
398
399 int mdc_rename(struct obd_export *exp, struct md_op_data *op_data,
400                 const char *old, size_t oldlen, const char *new, size_t newlen,
401                 struct ptlrpc_request **request)
402 {
403         LIST_HEAD(cancels);
404         struct obd_device *obd = exp->exp_obd;
405         struct ptlrpc_request *req;
406         int count = 0, rc;
407
408         ENTRY;
409
410         if ((op_data->op_flags & MF_MDC_CANCEL_FID1) &&
411             (fid_is_sane(&op_data->op_fid1)))
412                 count = mdc_resource_get_unused(exp, &op_data->op_fid1,
413                                                 &cancels, LCK_EX,
414                                                 MDS_INODELOCK_UPDATE);
415         if ((op_data->op_flags & MF_MDC_CANCEL_FID2) &&
416             (fid_is_sane(&op_data->op_fid2)))
417                 count += mdc_resource_get_unused(exp, &op_data->op_fid2,
418                                                  &cancels, LCK_EX,
419                                                  MDS_INODELOCK_UPDATE);
420         if ((op_data->op_flags & MF_MDC_CANCEL_FID3) &&
421             (fid_is_sane(&op_data->op_fid3)))
422                 count += mdc_resource_get_unused(exp, &op_data->op_fid3,
423                                                  &cancels, LCK_EX,
424                                                  MDS_INODELOCK_LOOKUP);
425         if ((op_data->op_flags & MF_MDC_CANCEL_FID4) &&
426             (fid_is_sane(&op_data->op_fid4)))
427                 count += mdc_resource_get_unused(exp, &op_data->op_fid4,
428                                                  &cancels, LCK_EX,
429                                                  MDS_INODELOCK_ELC);
430
431         req = ptlrpc_request_alloc(class_exp2cliimp(exp),
432                            op_data->op_cli_flags & CLI_MIGRATE ?
433                            &RQF_MDS_REINT_MIGRATE : &RQF_MDS_REINT_RENAME);
434         if (req == NULL) {
435                 ldlm_lock_list_put(&cancels, l_bl_ast, count);
436                 RETURN(-ENOMEM);
437         }
438
439         req_capsule_set_size(&req->rq_pill, &RMF_NAME, RCL_CLIENT, oldlen + 1);
440         req_capsule_set_size(&req->rq_pill, &RMF_SYMTGT, RCL_CLIENT, newlen+1);
441         if (op_data->op_cli_flags & CLI_MIGRATE)
442                 req_capsule_set_size(&req->rq_pill, &RMF_EADATA, RCL_CLIENT,
443                                      op_data->op_data_size);
444
445         /* get SELinux policy info if any */
446         rc = sptlrpc_get_sepol(req);
447         if (rc < 0) {
448                 ptlrpc_request_free(req);
449                 RETURN(rc);
450         }
451         req_capsule_set_size(&req->rq_pill, &RMF_SELINUX_POL, RCL_CLIENT,
452                              strlen(req->rq_sepol) ?
453                              strlen(req->rq_sepol) + 1 : 0);
454
455         rc = mdc_prep_elc_req(exp, req, MDS_REINT, &cancels, count);
456         if (rc) {
457                 ptlrpc_request_free(req);
458                 RETURN(rc);
459         }
460
461         if (exp_connect_cancelset(exp) && req)
462                 ldlm_cli_cancel_list(&cancels, count, req, 0);
463
464         if (op_data->op_cli_flags & CLI_MIGRATE)
465                 mdc_migrate_pack(req, op_data, old, oldlen);
466         else
467                 mdc_rename_pack(req, op_data, old, oldlen, new, newlen);
468
469         req_capsule_set_size(&req->rq_pill, &RMF_MDT_MD, RCL_SERVER,
470                              obd->u.cli.cl_default_mds_easize);
471         ptlrpc_request_set_replen(req);
472
473         rc = mdc_reint(req, LUSTRE_IMP_FULL);
474         *request = req;
475         if (rc == -ERESTARTSYS)
476                 rc = 0;
477
478         RETURN(rc);
479 }
480
481 int mdc_file_resync(struct obd_export *exp, struct md_op_data *op_data)
482 {
483         LIST_HEAD(cancels);
484         struct ptlrpc_request *req;
485         struct ldlm_lock *lock;
486         struct mdt_rec_resync *rec;
487         int count = 0, rc;
488         ENTRY;
489
490         if (op_data->op_flags & MF_MDC_CANCEL_FID1 &&
491             fid_is_sane(&op_data->op_fid1))
492                 count = mdc_resource_get_unused(exp, &op_data->op_fid1,
493                                                 &cancels, LCK_EX,
494                                                 MDS_INODELOCK_LAYOUT);
495
496         req = ptlrpc_request_alloc(class_exp2cliimp(exp),
497                                    &RQF_MDS_REINT_RESYNC);
498         if (req == NULL) {
499                 ldlm_lock_list_put(&cancels, l_bl_ast, count);
500                 RETURN(-ENOMEM);
501         }
502
503         rc = mdc_prep_elc_req(exp, req, MDS_REINT, &cancels, count);
504         if (rc) {
505                 ptlrpc_request_free(req);
506                 RETURN(rc);
507         }
508
509         BUILD_BUG_ON(sizeof(*rec) != sizeof(struct mdt_rec_reint));
510         rec = req_capsule_client_get(&req->rq_pill, &RMF_REC_REINT);
511         rec->rs_opcode  = REINT_RESYNC;
512         rec->rs_fsuid   = op_data->op_fsuid;
513         rec->rs_fsgid   = op_data->op_fsgid;
514         rec->rs_cap     = op_data->op_cap;
515         rec->rs_fid     = op_data->op_fid1;
516         rec->rs_bias    = op_data->op_bias;
517         rec->rs_mirror_id = op_data->op_mirror_id;
518
519         lock = ldlm_handle2lock(&op_data->op_lease_handle);
520         if (lock != NULL) {
521                 rec->rs_lease_handle = lock->l_remote_handle;
522                 LDLM_LOCK_PUT(lock);
523         }
524
525         ptlrpc_request_set_replen(req);
526
527         rc = mdc_reint(req, LUSTRE_IMP_FULL);
528         if (rc == -ERESTARTSYS)
529                 rc = 0;
530
531         ptlrpc_req_finished(req);
532         RETURN(rc);
533 }