Whamcloud - gitweb
LU-12616 obclass: fix MDS start/stop race
[fs/lustre-release.git] / lustre / mdc / mdc_reint.c
1 /*
2  * GPL HEADER START
3  *
4  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
5  *
6  * This program is free software; you can redistribute it and/or modify
7  * it under the terms of the GNU General Public License version 2 only,
8  * as published by the Free Software Foundation.
9  *
10  * This program is distributed in the hope that it will be useful, but
11  * WITHOUT ANY WARRANTY; without even the implied warranty of
12  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13  * General Public License version 2 for more details (a copy is included
14  * in the LICENSE file that accompanied this code).
15  *
16  * You should have received a copy of the GNU General Public License
17  * version 2 along with this program; If not, see
18  * http://www.gnu.org/licenses/gpl-2.0.html
19  *
20  * GPL HEADER END
21  */
22 /*
23  * Copyright (c) 2002, 2010, Oracle and/or its affiliates. All rights reserved.
24  * Use is subject to license terms.
25  *
26  * Copyright (c) 2012, 2017, Intel Corporation.
27  */
28 /*
29  * This file is part of Lustre, http://www.lustre.org/
30  * Lustre is a trademark of Sun Microsystems, Inc.
31  */
32
33 #define DEBUG_SUBSYSTEM S_MDC
34
35 #include <linux/module.h>
36 #include <linux/kernel.h>
37
38 #include <obd_class.h>
39 #include "mdc_internal.h"
40 #include <lustre_fid.h>
41
42 /* mdc_setattr does its own semaphore handling */
43 static int mdc_reint(struct ptlrpc_request *request, int level)
44 {
45         int rc;
46
47         request->rq_send_state = level;
48
49         mdc_get_mod_rpc_slot(request, NULL);
50         rc = ptlrpc_queue_wait(request);
51         mdc_put_mod_rpc_slot(request, NULL);
52         if (rc)
53                 CDEBUG(D_INFO, "error in handling %d\n", rc);
54         else if (!req_capsule_server_get(&request->rq_pill, &RMF_MDT_BODY)) {
55                 rc = -EPROTO;
56         }
57         return rc;
58 }
59
60 /* Find and cancel locally locks matched by inode @bits & @mode in the resource
61  * found by @fid. Found locks are added into @cancel list. Returns the amount of
62  * locks added to @cancels list. */
63 int mdc_resource_get_unused_res(struct obd_export *exp,
64                                 struct ldlm_res_id *res_id,
65                                 struct list_head *cancels,
66                                 enum ldlm_mode mode, __u64 bits)
67 {
68         struct ldlm_namespace *ns = exp->exp_obd->obd_namespace;
69         union ldlm_policy_data policy = { { 0 } };
70         struct ldlm_resource *res;
71         int count;
72
73         ENTRY;
74
75         /* Return, i.e. cancel nothing, only if ELC is supported (flag in
76          * export) but disabled through procfs (flag in NS).
77          *
78          * This distinguishes from a case when ELC is not supported originally,
79          * when we still want to cancel locks in advance and just cancel them
80          * locally, without sending any RPC. */
81         if (exp_connect_cancelset(exp) && !ns_connect_cancelset(ns))
82                 RETURN(0);
83
84         res = ldlm_resource_get(ns, NULL, res_id, 0, 0);
85         if (IS_ERR(res))
86                 RETURN(0);
87         LDLM_RESOURCE_ADDREF(res);
88         /* Initialize ibits lock policy. */
89         policy.l_inodebits.bits = bits;
90         count = ldlm_cancel_resource_local(res, cancels, &policy, mode, 0, 0,
91                                            NULL);
92         LDLM_RESOURCE_DELREF(res);
93         ldlm_resource_putref(res);
94         RETURN(count);
95 }
96
97 int mdc_resource_get_unused(struct obd_export *exp, const struct lu_fid *fid,
98                             struct list_head *cancels, enum ldlm_mode mode,
99                             __u64 bits)
100 {
101         struct ldlm_res_id res_id;
102
103         fid_build_reg_res_name(fid, &res_id);
104         return mdc_resource_get_unused_res(exp, &res_id, cancels, mode, bits);
105 }
106
107 int mdc_setattr(struct obd_export *exp, struct md_op_data *op_data,
108                 void *ea, size_t ealen, struct ptlrpc_request **request)
109 {
110         struct list_head cancels = LIST_HEAD_INIT(cancels);
111         struct ptlrpc_request *req;
112         int count = 0, rc;
113         __u64 bits;
114         ENTRY;
115
116         LASSERT(op_data != NULL);
117
118         bits = MDS_INODELOCK_UPDATE;
119         if (op_data->op_attr.ia_valid & (ATTR_MODE|ATTR_UID|ATTR_GID))
120                 bits |= MDS_INODELOCK_LOOKUP;
121         if ((op_data->op_flags & MF_MDC_CANCEL_FID1) &&
122             (fid_is_sane(&op_data->op_fid1)))
123                 count = mdc_resource_get_unused(exp, &op_data->op_fid1,
124                                                 &cancels, LCK_EX, bits);
125         req = ptlrpc_request_alloc(class_exp2cliimp(exp),
126                                    &RQF_MDS_REINT_SETATTR);
127         if (req == NULL) {
128                 ldlm_lock_list_put(&cancels, l_bl_ast, count);
129                 RETURN(-ENOMEM);
130         }
131
132         req_capsule_set_size(&req->rq_pill, &RMF_MDT_EPOCH, RCL_CLIENT, 0);
133         req_capsule_set_size(&req->rq_pill, &RMF_EADATA, RCL_CLIENT, ealen);
134         req_capsule_set_size(&req->rq_pill, &RMF_LOGCOOKIES, RCL_CLIENT, 0);
135
136         rc = mdc_prep_elc_req(exp, req, MDS_REINT, &cancels, count);
137         if (rc) {
138                 ptlrpc_request_free(req);
139                 RETURN(rc);
140         }
141
142         if (op_data->op_attr.ia_valid & (ATTR_MTIME | ATTR_CTIME))
143                 CDEBUG(D_INODE, "setting mtime %lld, ctime %lld\n",
144                        (s64)op_data->op_attr.ia_mtime.tv_sec,
145                        (s64)op_data->op_attr.ia_ctime.tv_sec);
146         mdc_setattr_pack(req, op_data, ea, ealen);
147
148         req_capsule_set_size(&req->rq_pill, &RMF_ACL, RCL_SERVER, 0);
149
150         ptlrpc_request_set_replen(req);
151
152         rc = mdc_reint(req, LUSTRE_IMP_FULL);
153         if (rc == -ERESTARTSYS)
154                 rc = 0;
155
156         *request = req;
157
158         RETURN(rc);
159 }
160
161 int mdc_create(struct obd_export *exp, struct md_op_data *op_data,
162                 const void *data, size_t datalen,
163                 umode_t mode, uid_t uid, gid_t gid,
164                 cfs_cap_t cap_effective, __u64 rdev,
165                 struct ptlrpc_request **request)
166 {
167         struct ptlrpc_request *req;
168         int level, rc;
169         int count, resends = 0;
170         struct obd_import *import = exp->exp_obd->u.cli.cl_import;
171         int generation = import->imp_generation;
172         struct list_head cancels = LIST_HEAD_INIT(cancels);
173         ENTRY;
174
175         /* For case if upper layer did not alloc fid, do it now. */
176         if (!fid_is_sane(&op_data->op_fid2)) {
177                 /*
178                  * mdc_fid_alloc() may return errno 1 in case of switch to new
179                  * sequence, handle this.
180                  */
181                 rc = mdc_fid_alloc(NULL, exp, &op_data->op_fid2, op_data);
182                 if (rc < 0)
183                         RETURN(rc);
184         }
185
186 rebuild:
187         count = 0;
188         if ((op_data->op_flags & MF_MDC_CANCEL_FID1) &&
189             (fid_is_sane(&op_data->op_fid1)))
190                 count = mdc_resource_get_unused(exp, &op_data->op_fid1,
191                                                 &cancels, LCK_EX,
192                                                 MDS_INODELOCK_UPDATE);
193
194         req = ptlrpc_request_alloc(class_exp2cliimp(exp),
195                                    &RQF_MDS_REINT_CREATE_ACL);
196         if (req == NULL) {
197                 ldlm_lock_list_put(&cancels, l_bl_ast, count);
198                 RETURN(-ENOMEM);
199         }
200
201         req_capsule_set_size(&req->rq_pill, &RMF_NAME, RCL_CLIENT,
202                              op_data->op_namelen + 1);
203         req_capsule_set_size(&req->rq_pill, &RMF_EADATA, RCL_CLIENT,
204                              data && datalen ? datalen : 0);
205
206         req_capsule_set_size(&req->rq_pill, &RMF_FILE_SECCTX_NAME,
207                              RCL_CLIENT, op_data->op_file_secctx_name != NULL ?
208                              strlen(op_data->op_file_secctx_name) + 1 : 0);
209
210         req_capsule_set_size(&req->rq_pill, &RMF_FILE_SECCTX, RCL_CLIENT,
211                              op_data->op_file_secctx_size);
212
213         /* get SELinux policy info if any */
214         rc = sptlrpc_get_sepol(req);
215         if (rc < 0) {
216                 ptlrpc_request_free(req);
217                 RETURN(rc);
218         }
219         req_capsule_set_size(&req->rq_pill, &RMF_SELINUX_POL, RCL_CLIENT,
220                              strlen(req->rq_sepol) ?
221                              strlen(req->rq_sepol) + 1 : 0);
222
223         rc = mdc_prep_elc_req(exp, req, MDS_REINT, &cancels, count);
224         if (rc) {
225                 ptlrpc_request_free(req);
226                 RETURN(rc);
227         }
228
229         /*
230          * mdc_create_pack() fills msg->bufs[1] with name and msg->bufs[2] with
231          * tgt, for symlinks or lov MD data.
232          */
233         mdc_create_pack(req, op_data, data, datalen, mode, uid,
234                         gid, cap_effective, rdev);
235
236         ptlrpc_request_set_replen(req);
237
238         /* ask ptlrpc not to resend on EINPROGRESS since we have our own retry
239          * logic here */
240         req->rq_no_retry_einprogress = 1;
241
242         if (resends) {
243                 req->rq_generation_set = 1;
244                 req->rq_import_generation = generation;
245                 req->rq_sent = ktime_get_real_seconds() + resends;
246         }
247         level = LUSTRE_IMP_FULL;
248  resend:
249         rc = mdc_reint(req, level);
250
251         /* Resend if we were told to. */
252         if (rc == -ERESTARTSYS) {
253                 level = LUSTRE_IMP_RECOVER;
254                 goto resend;
255         } else if (rc == -EINPROGRESS) {
256                 /* Retry create infinitely until succeed or get other
257                  * error code or interrupted. */
258                 ptlrpc_req_finished(req);
259                 if (generation == import->imp_generation) {
260                         if (signal_pending(current))
261                                 RETURN(-EINTR);
262
263                         resends++;
264                         CDEBUG(D_HA, "%s: resend:%d create on "DFID"/"DFID"\n",
265                                exp->exp_obd->obd_name, resends,
266                                PFID(&op_data->op_fid1),
267                                PFID(&op_data->op_fid2));
268                         goto rebuild;
269                 } else {
270                         CDEBUG(D_HA, "resend cross eviction\n");
271                         RETURN(-EIO);
272                 }
273         }
274
275         *request = req;
276         RETURN(rc);
277 }
278
279 int mdc_unlink(struct obd_export *exp, struct md_op_data *op_data,
280                struct ptlrpc_request **request)
281 {
282         struct list_head cancels = LIST_HEAD_INIT(cancels);
283         struct obd_device *obd = class_exp2obd(exp);
284         struct ptlrpc_request *req = *request;
285         int count = 0, rc;
286         ENTRY;
287
288         LASSERT(req == NULL);
289
290         if ((op_data->op_flags & MF_MDC_CANCEL_FID1) &&
291             (fid_is_sane(&op_data->op_fid1)))
292                 count = mdc_resource_get_unused(exp, &op_data->op_fid1,
293                                                 &cancels, LCK_EX,
294                                                 MDS_INODELOCK_UPDATE);
295         if ((op_data->op_flags & MF_MDC_CANCEL_FID3) &&
296             (fid_is_sane(&op_data->op_fid3)))
297                 /* don't cancel DoM lock which may cause data flush */
298                 count += mdc_resource_get_unused(exp, &op_data->op_fid3,
299                                                  &cancels, LCK_EX,
300                                                  MDS_INODELOCK_ELC);
301         req = ptlrpc_request_alloc(class_exp2cliimp(exp),
302                                    &RQF_MDS_REINT_UNLINK);
303         if (req == NULL) {
304                 ldlm_lock_list_put(&cancels, l_bl_ast, count);
305                 RETURN(-ENOMEM);
306         }
307
308         req_capsule_set_size(&req->rq_pill, &RMF_NAME, RCL_CLIENT,
309                              op_data->op_namelen + 1);
310
311         /* get SELinux policy info if any */
312         rc = sptlrpc_get_sepol(req);
313         if (rc < 0) {
314                 ptlrpc_request_free(req);
315                 RETURN(rc);
316         }
317         req_capsule_set_size(&req->rq_pill, &RMF_SELINUX_POL, RCL_CLIENT,
318                              strlen(req->rq_sepol) ?
319                              strlen(req->rq_sepol) + 1 : 0);
320
321         rc = mdc_prep_elc_req(exp, req, MDS_REINT, &cancels, count);
322         if (rc) {
323                 ptlrpc_request_free(req);
324                 RETURN(rc);
325         }
326
327         mdc_unlink_pack(req, op_data);
328
329         req_capsule_set_size(&req->rq_pill, &RMF_MDT_MD, RCL_SERVER,
330                              obd->u.cli.cl_default_mds_easize);
331         ptlrpc_request_set_replen(req);
332
333         *request = req;
334
335         rc = mdc_reint(req, LUSTRE_IMP_FULL);
336         if (rc == -ERESTARTSYS)
337                 rc = 0;
338         RETURN(rc);
339 }
340
341 int mdc_link(struct obd_export *exp, struct md_op_data *op_data,
342              struct ptlrpc_request **request)
343 {
344         struct list_head cancels = LIST_HEAD_INIT(cancels);
345         struct ptlrpc_request *req;
346         int count = 0, rc;
347         ENTRY;
348
349         if ((op_data->op_flags & MF_MDC_CANCEL_FID2) &&
350             (fid_is_sane(&op_data->op_fid2)))
351                 count = mdc_resource_get_unused(exp, &op_data->op_fid2,
352                                                 &cancels, LCK_EX,
353                                                 MDS_INODELOCK_UPDATE);
354         if ((op_data->op_flags & MF_MDC_CANCEL_FID1) &&
355             (fid_is_sane(&op_data->op_fid1)))
356                 count += mdc_resource_get_unused(exp, &op_data->op_fid1,
357                                                  &cancels, LCK_EX,
358                                                  MDS_INODELOCK_UPDATE);
359
360         req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_MDS_REINT_LINK);
361         if (req == NULL) {
362                 ldlm_lock_list_put(&cancels, l_bl_ast, count);
363                 RETURN(-ENOMEM);
364         }
365
366         req_capsule_set_size(&req->rq_pill, &RMF_NAME, RCL_CLIENT,
367                              op_data->op_namelen + 1);
368
369         /* get SELinux policy info if any */
370         rc = sptlrpc_get_sepol(req);
371         if (rc < 0) {
372                 ptlrpc_request_free(req);
373                 RETURN(rc);
374         }
375         req_capsule_set_size(&req->rq_pill, &RMF_SELINUX_POL, RCL_CLIENT,
376                              strlen(req->rq_sepol) ?
377                              strlen(req->rq_sepol) + 1 : 0);
378
379         rc = mdc_prep_elc_req(exp, req, MDS_REINT, &cancels, count);
380         if (rc) {
381                 ptlrpc_request_free(req);
382                 RETURN(rc);
383         }
384
385         mdc_link_pack(req, op_data);
386         ptlrpc_request_set_replen(req);
387
388         rc = mdc_reint(req, LUSTRE_IMP_FULL);
389         *request = req;
390         if (rc == -ERESTARTSYS)
391                 rc = 0;
392
393         RETURN(rc);
394 }
395
396 int mdc_rename(struct obd_export *exp, struct md_op_data *op_data,
397                 const char *old, size_t oldlen, const char *new, size_t newlen,
398                 struct ptlrpc_request **request)
399 {
400         struct list_head cancels = LIST_HEAD_INIT(cancels);
401         struct obd_device *obd = exp->exp_obd;
402         struct ptlrpc_request *req;
403         int count = 0, rc;
404
405         ENTRY;
406
407         if ((op_data->op_flags & MF_MDC_CANCEL_FID1) &&
408             (fid_is_sane(&op_data->op_fid1)))
409                 count = mdc_resource_get_unused(exp, &op_data->op_fid1,
410                                                 &cancels, LCK_EX,
411                                                 MDS_INODELOCK_UPDATE);
412         if ((op_data->op_flags & MF_MDC_CANCEL_FID2) &&
413             (fid_is_sane(&op_data->op_fid2)))
414                 count += mdc_resource_get_unused(exp, &op_data->op_fid2,
415                                                  &cancels, LCK_EX,
416                                                  MDS_INODELOCK_UPDATE);
417         if ((op_data->op_flags & MF_MDC_CANCEL_FID3) &&
418             (fid_is_sane(&op_data->op_fid3)))
419                 count += mdc_resource_get_unused(exp, &op_data->op_fid3,
420                                                  &cancels, LCK_EX,
421                                                  MDS_INODELOCK_LOOKUP);
422         if ((op_data->op_flags & MF_MDC_CANCEL_FID4) &&
423             (fid_is_sane(&op_data->op_fid4)))
424                 count += mdc_resource_get_unused(exp, &op_data->op_fid4,
425                                                  &cancels, LCK_EX,
426                                                  MDS_INODELOCK_ELC);
427
428         req = ptlrpc_request_alloc(class_exp2cliimp(exp),
429                            op_data->op_cli_flags & CLI_MIGRATE ?
430                            &RQF_MDS_REINT_MIGRATE : &RQF_MDS_REINT_RENAME);
431         if (req == NULL) {
432                 ldlm_lock_list_put(&cancels, l_bl_ast, count);
433                 RETURN(-ENOMEM);
434         }
435
436         req_capsule_set_size(&req->rq_pill, &RMF_NAME, RCL_CLIENT, oldlen + 1);
437         req_capsule_set_size(&req->rq_pill, &RMF_SYMTGT, RCL_CLIENT, newlen+1);
438         if (op_data->op_cli_flags & CLI_MIGRATE)
439                 req_capsule_set_size(&req->rq_pill, &RMF_EADATA, RCL_CLIENT,
440                                      op_data->op_data_size);
441
442         /* get SELinux policy info if any */
443         rc = sptlrpc_get_sepol(req);
444         if (rc < 0) {
445                 ptlrpc_request_free(req);
446                 RETURN(rc);
447         }
448         req_capsule_set_size(&req->rq_pill, &RMF_SELINUX_POL, RCL_CLIENT,
449                              strlen(req->rq_sepol) ?
450                              strlen(req->rq_sepol) + 1 : 0);
451
452         rc = mdc_prep_elc_req(exp, req, MDS_REINT, &cancels, count);
453         if (rc) {
454                 ptlrpc_request_free(req);
455                 RETURN(rc);
456         }
457
458         if (exp_connect_cancelset(exp) && req)
459                 ldlm_cli_cancel_list(&cancels, count, req, 0);
460
461         if (op_data->op_cli_flags & CLI_MIGRATE)
462                 mdc_migrate_pack(req, op_data, old, oldlen);
463         else
464                 mdc_rename_pack(req, op_data, old, oldlen, new, newlen);
465
466         req_capsule_set_size(&req->rq_pill, &RMF_MDT_MD, RCL_SERVER,
467                              obd->u.cli.cl_default_mds_easize);
468         ptlrpc_request_set_replen(req);
469
470         rc = mdc_reint(req, LUSTRE_IMP_FULL);
471         *request = req;
472         if (rc == -ERESTARTSYS)
473                 rc = 0;
474
475         RETURN(rc);
476 }
477
478 int mdc_file_resync(struct obd_export *exp, struct md_op_data *op_data)
479 {
480         struct list_head cancels = LIST_HEAD_INIT(cancels);
481         struct ptlrpc_request *req;
482         struct ldlm_lock *lock;
483         struct mdt_rec_resync *rec;
484         int count = 0, rc;
485         ENTRY;
486
487         if (op_data->op_flags & MF_MDC_CANCEL_FID1 &&
488             fid_is_sane(&op_data->op_fid1))
489                 count = mdc_resource_get_unused(exp, &op_data->op_fid1,
490                                                 &cancels, LCK_EX,
491                                                 MDS_INODELOCK_LAYOUT);
492
493         req = ptlrpc_request_alloc(class_exp2cliimp(exp),
494                                    &RQF_MDS_REINT_RESYNC);
495         if (req == NULL) {
496                 ldlm_lock_list_put(&cancels, l_bl_ast, count);
497                 RETURN(-ENOMEM);
498         }
499
500         rc = mdc_prep_elc_req(exp, req, MDS_REINT, &cancels, count);
501         if (rc) {
502                 ptlrpc_request_free(req);
503                 RETURN(rc);
504         }
505
506         CLASSERT(sizeof(*rec) == sizeof(struct mdt_rec_reint));
507         rec = req_capsule_client_get(&req->rq_pill, &RMF_REC_REINT);
508         rec->rs_opcode  = REINT_RESYNC;
509         rec->rs_fsuid   = op_data->op_fsuid;
510         rec->rs_fsgid   = op_data->op_fsgid;
511         rec->rs_cap     = op_data->op_cap;
512         rec->rs_fid     = op_data->op_fid1;
513         rec->rs_bias    = op_data->op_bias;
514         rec->rs_mirror_id = op_data->op_mirror_id;
515
516         lock = ldlm_handle2lock(&op_data->op_lease_handle);
517         if (lock != NULL) {
518                 rec->rs_lease_handle = lock->l_remote_handle;
519                 LDLM_LOCK_PUT(lock);
520         }
521
522         ptlrpc_request_set_replen(req);
523
524         rc = mdc_reint(req, LUSTRE_IMP_FULL);
525         if (rc == -ERESTARTSYS)
526                 rc = 0;
527
528         ptlrpc_req_finished(req);
529         RETURN(rc);
530 }