Whamcloud - gitweb
LU-17705 ptlrpc: replace synchronize_rcu() with rcu_barrier()
[fs/lustre-release.git] / lustre / mdc / mdc_reint.c
1 /*
2  * GPL HEADER START
3  *
4  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
5  *
6  * This program is free software; you can redistribute it and/or modify
7  * it under the terms of the GNU General Public License version 2 only,
8  * as published by the Free Software Foundation.
9  *
10  * This program is distributed in the hope that it will be useful, but
11  * WITHOUT ANY WARRANTY; without even the implied warranty of
12  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13  * General Public License version 2 for more details (a copy is included
14  * in the LICENSE file that accompanied this code).
15  *
16  * You should have received a copy of the GNU General Public License
17  * version 2 along with this program; If not, see
18  * http://www.gnu.org/licenses/gpl-2.0.html
19  *
20  * GPL HEADER END
21  */
22 /*
23  * Copyright (c) 2002, 2010, Oracle and/or its affiliates. All rights reserved.
24  * Use is subject to license terms.
25  *
26  * Copyright (c) 2012, 2017, Intel Corporation.
27  */
28 /*
29  * This file is part of Lustre, http://www.lustre.org/
30  */
31
32 #define DEBUG_SUBSYSTEM S_MDC
33
34 #include <linux/module.h>
35 #include <linux/kernel.h>
36
37 #include <obd_class.h>
38 #include "mdc_internal.h"
39 #include <lustre_fid.h>
40
41 /* mdc_setattr does its own semaphore handling */
42 static int mdc_reint(struct ptlrpc_request *request, int level)
43 {
44         int rc;
45
46         request->rq_send_state = level;
47
48         ptlrpc_get_mod_rpc_slot(request);
49         rc = ptlrpc_queue_wait(request);
50         ptlrpc_put_mod_rpc_slot(request);
51         if (rc)
52                 CDEBUG(D_INFO, "error in handling %d\n", rc);
53         else if (!req_capsule_server_get(&request->rq_pill, &RMF_MDT_BODY)) {
54                 rc = -EPROTO;
55         }
56         return rc;
57 }
58
59 /* Find and cancel locally locks matched by inode @bits & @mode in the resource
60  * found by @fid. Found locks are added into @cancel list. Returns the amount of
61  * locks added to @cancels list. */
62 int mdc_resource_get_unused_res(struct obd_export *exp,
63                                 struct ldlm_res_id *res_id,
64                                 struct list_head *cancels,
65                                 enum ldlm_mode mode, __u64 bits)
66 {
67         struct ldlm_namespace *ns = exp->exp_obd->obd_namespace;
68         union ldlm_policy_data policy = { { 0 } };
69         struct ldlm_resource *res;
70         int count;
71
72         ENTRY;
73
74         /* Return, i.e. cancel nothing, only if ELC is supported (flag in
75          * export) but disabled through procfs (flag in NS).
76          *
77          * This distinguishes from a case when ELC is not supported originally,
78          * when we still want to cancel locks in advance and just cancel them
79          * locally, without sending any RPC. */
80         if (exp_connect_cancelset(exp) && !ns_connect_cancelset(ns))
81                 RETURN(0);
82
83         res = ldlm_resource_get(ns, res_id, 0, 0);
84         if (IS_ERR(res))
85                 RETURN(0);
86         LDLM_RESOURCE_ADDREF(res);
87         /* Initialize ibits lock policy. */
88         policy.l_inodebits.bits = bits;
89         count = ldlm_cancel_resource_local(res, cancels, &policy, mode, 0, 0,
90                                            NULL);
91         LDLM_RESOURCE_DELREF(res);
92         ldlm_resource_putref(res);
93         RETURN(count);
94 }
95
96 int mdc_resource_get_unused(struct obd_export *exp, const struct lu_fid *fid,
97                             struct list_head *cancels, enum ldlm_mode mode,
98                             __u64 bits)
99 {
100         struct ldlm_res_id res_id;
101
102         fid_build_reg_res_name(fid, &res_id);
103         return mdc_resource_get_unused_res(exp, &res_id, cancels, mode, bits);
104 }
105
106 int mdc_setattr(struct obd_export *exp, struct md_op_data *op_data,
107                 void *ea, size_t ealen, struct ptlrpc_request **request)
108 {
109         LIST_HEAD(cancels);
110         struct ptlrpc_request *req;
111         int count = 0, rc;
112         __u64 bits;
113         ENTRY;
114
115         LASSERT(op_data != NULL);
116
117         bits = MDS_INODELOCK_UPDATE;
118         if (op_data->op_attr.ia_valid & (ATTR_MODE|ATTR_UID|ATTR_GID))
119                 bits |= MDS_INODELOCK_LOOKUP;
120         if ((op_data->op_flags & MF_MDC_CANCEL_FID1) &&
121             (fid_is_sane(&op_data->op_fid1)))
122                 count = mdc_resource_get_unused(exp, &op_data->op_fid1,
123                                                 &cancels, LCK_EX, bits);
124         req = ptlrpc_request_alloc(class_exp2cliimp(exp),
125                                    &RQF_MDS_REINT_SETATTR);
126         if (req == NULL) {
127                 ldlm_lock_list_put(&cancels, l_bl_ast, count);
128                 RETURN(-ENOMEM);
129         }
130
131         req_capsule_set_size(&req->rq_pill, &RMF_MDT_EPOCH, RCL_CLIENT, 0);
132         req_capsule_set_size(&req->rq_pill, &RMF_EADATA, RCL_CLIENT, ealen);
133         req_capsule_set_size(&req->rq_pill, &RMF_LOGCOOKIES, RCL_CLIENT, 0);
134
135         rc = mdc_prep_elc_req(exp, req, MDS_REINT, &cancels, count);
136         if (rc) {
137                 ptlrpc_request_free(req);
138                 RETURN(rc);
139         }
140
141         if (op_data->op_attr.ia_valid & (ATTR_MTIME | ATTR_CTIME))
142                 CDEBUG(D_INODE, "setting mtime %lld, ctime %lld\n",
143                        (s64)op_data->op_attr.ia_mtime.tv_sec,
144                        (s64)op_data->op_attr.ia_ctime.tv_sec);
145         mdc_setattr_pack(&req->rq_pill, op_data, ea, ealen);
146
147         req_capsule_set_size(&req->rq_pill, &RMF_ACL, RCL_SERVER, 0);
148
149         ptlrpc_request_set_replen(req);
150
151         rc = mdc_reint(req, LUSTRE_IMP_FULL);
152         if (rc == -ERESTARTSYS)
153                 rc = 0;
154
155         *request = req;
156
157         RETURN(rc);
158 }
159
160 int mdc_create(struct obd_export *exp, struct md_op_data *op_data,
161                 const void *data, size_t datalen,
162                 umode_t mode, uid_t uid, gid_t gid,
163                 kernel_cap_t cap_effective, __u64 rdev,
164                 struct ptlrpc_request **request)
165 {
166         struct ptlrpc_request *req;
167         struct sptlrpc_sepol *sepol;
168         int level, rc;
169         int count, resends = 0;
170         struct obd_import *import = exp->exp_obd->u.cli.cl_import;
171         int generation = import->imp_generation;
172         LIST_HEAD(cancels);
173
174         ENTRY;
175
176         /* For case if upper layer did not alloc fid, do it now. */
177         if (!fid_is_sane(&op_data->op_fid2)) {
178                 /*
179                  * mdc_fid_alloc() may return errno 1 in case of switch to new
180                  * sequence, handle this.
181                  */
182                 rc = mdc_fid_alloc(NULL, exp, &op_data->op_fid2, op_data);
183                 if (rc < 0)
184                         RETURN(rc);
185         }
186
187 rebuild:
188         count = 0;
189         if ((op_data->op_flags & MF_MDC_CANCEL_FID1) &&
190             (fid_is_sane(&op_data->op_fid1)))
191                 count = mdc_resource_get_unused(exp, &op_data->op_fid1,
192                                                 &cancels, LCK_EX,
193                                                 MDS_INODELOCK_UPDATE);
194
195         req = ptlrpc_request_alloc(class_exp2cliimp(exp),
196                                    &RQF_MDS_REINT_CREATE_ACL);
197         if (req == NULL) {
198                 ldlm_lock_list_put(&cancels, l_bl_ast, count);
199                 RETURN(-ENOMEM);
200         }
201
202         req_capsule_set_size(&req->rq_pill, &RMF_NAME, RCL_CLIENT,
203                              op_data->op_namelen + 1);
204         req_capsule_set_size(&req->rq_pill, &RMF_EADATA, RCL_CLIENT,
205                              data && datalen ? datalen : 0);
206
207         req_capsule_set_size(&req->rq_pill, &RMF_FILE_SECCTX_NAME,
208                              RCL_CLIENT, op_data->op_file_secctx_name != NULL ?
209                              strlen(op_data->op_file_secctx_name) + 1 : 0);
210
211         req_capsule_set_size(&req->rq_pill, &RMF_FILE_SECCTX, RCL_CLIENT,
212                              op_data->op_file_secctx_size);
213
214         req_capsule_set_size(&req->rq_pill, &RMF_FILE_ENCCTX, RCL_CLIENT,
215                              op_data->op_file_encctx_size);
216
217         /* get SELinux policy info if any */
218         sepol = sptlrpc_sepol_get(req);
219         if (IS_ERR(sepol))
220                 GOTO(err_free_rq, rc = PTR_ERR(sepol));
221
222         req_capsule_set_size(&req->rq_pill, &RMF_SELINUX_POL, RCL_CLIENT,
223                              sptlrpc_sepol_size(sepol));
224
225         rc = mdc_prep_elc_req(exp, req, MDS_REINT, &cancels, count);
226         if (rc)
227                 GOTO(err_put_sepol, rc);
228
229         /*
230          * mdc_create_pack() fills msg->bufs[1] with name and msg->bufs[2] with
231          * tgt, for symlinks or lov MD data.
232          */
233         mdc_create_pack(&req->rq_pill, op_data, data, datalen, mode, uid,
234                         gid, cap_effective, rdev, sepol);
235
236         sptlrpc_sepol_put(sepol);
237
238         req_capsule_set_size(&req->rq_pill, &RMF_MDT_MD, RCL_SERVER,
239                              exp->exp_obd->u.cli.cl_default_mds_easize);
240         ptlrpc_request_set_replen(req);
241
242         /* ask ptlrpc not to resend on EINPROGRESS since we have our own retry
243          * logic here */
244         req->rq_no_retry_einprogress = 1;
245
246         if (resends) {
247                 req->rq_generation_set = 1;
248                 req->rq_import_generation = generation;
249                 req->rq_sent = ktime_get_real_seconds() + resends;
250         }
251         level = LUSTRE_IMP_FULL;
252  resend:
253         rc = mdc_reint(req, level);
254
255         /* Resend if we were told to. */
256         if (rc == -ERESTARTSYS) {
257                 level = LUSTRE_IMP_RECOVER;
258                 goto resend;
259         } else if (rc == -EINPROGRESS) {
260                 /* Retry create infinitely until succeed or get other
261                  * error code or interrupted. */
262                 ptlrpc_req_finished(req);
263                 if (generation == import->imp_generation) {
264                         if (signal_pending(current))
265                                 RETURN(-EINTR);
266
267                         resends++;
268                         CDEBUG(D_HA, "%s: resend:%d create on "DFID"/"DFID"\n",
269                                exp->exp_obd->obd_name, resends,
270                                PFID(&op_data->op_fid1),
271                                PFID(&op_data->op_fid2));
272                         goto rebuild;
273                 } else {
274                         CDEBUG(D_HA, "resend cross eviction\n");
275                         RETURN(-EIO);
276                 }
277         } else if (rc == 0 && S_ISDIR(mode)) {
278                 struct mdt_body *body;
279
280                 body = req_capsule_server_get(&req->rq_pill, &RMF_MDT_BODY);
281                 if (body == NULL) {
282                         rc = -EPROTO;
283                         CERROR("%s: cannot swab mdt_body: rc = %d\n",
284                                exp->exp_obd->obd_name, rc);
285                         RETURN(rc);
286                 }
287
288                 if ((body->mbo_valid & (OBD_MD_FLDIREA | OBD_MD_MEA)) ==
289                     (OBD_MD_FLDIREA | OBD_MD_MEA)) {
290                         void *eadata;
291
292                         /* clear valid, because mkdir doesn't need to initialize
293                          * LMV, which will be delayed to lookup.
294                          */
295                         body->mbo_valid &= ~(OBD_MD_FLDIREA | OBD_MD_MEA);
296                         mdc_update_max_ea_from_body(exp, body);
297                         /* The eadata is opaque; just check that it is there.
298                          * Eventually, obd_unpackmd() will check the contents.
299                          */
300                         eadata = req_capsule_server_sized_get(&req->rq_pill,
301                                                           &RMF_MDT_MD,
302                                                           body->mbo_eadatasize);
303                         if (eadata == NULL)
304                                 RETURN(-EPROTO);
305
306                         /* save the reply LMV EA in case we have to replay a
307                          * create for recovery.
308                          */
309                         rc = mdc_save_lmm(req, eadata, body->mbo_eadatasize);
310                 }
311         }
312
313         *request = req;
314
315         RETURN(rc);
316
317 err_put_sepol:
318         sptlrpc_sepol_put(sepol);
319 err_free_rq:
320         ptlrpc_request_free(req);
321
322         RETURN(rc);
323 }
324
325 int mdc_unlink(struct obd_export *exp, struct md_op_data *op_data,
326                struct ptlrpc_request **request)
327 {
328         LIST_HEAD(cancels);
329         struct obd_device *obd = class_exp2obd(exp);
330         struct ptlrpc_request *req = *request;
331         struct sptlrpc_sepol *sepol;
332         int count = 0, rc;
333         ENTRY;
334
335         LASSERT(req == NULL);
336
337         if ((op_data->op_flags & MF_MDC_CANCEL_FID1) &&
338             (fid_is_sane(&op_data->op_fid1)))
339                 count = mdc_resource_get_unused(exp, &op_data->op_fid1,
340                                                 &cancels, LCK_EX,
341                                                 MDS_INODELOCK_UPDATE);
342         if ((op_data->op_flags & MF_MDC_CANCEL_FID3) &&
343             (fid_is_sane(&op_data->op_fid3)))
344                 /* cancel DOM lock only if it has no data to flush */
345                 count += mdc_resource_get_unused(exp, &op_data->op_fid3,
346                                                  &cancels, LCK_EX,
347                                                  op_data->op_cli_flags &
348                                                  CLI_DIRTY_DATA ?
349                                                  MDS_INODELOCK_ELC :
350                                                  MDS_INODELOCK_FULL);
351         req = ptlrpc_request_alloc(class_exp2cliimp(exp),
352                                    &RQF_MDS_REINT_UNLINK);
353         if (req == NULL) {
354                 ldlm_lock_list_put(&cancels, l_bl_ast, count);
355                 RETURN(-ENOMEM);
356         }
357
358         req_capsule_set_size(&req->rq_pill, &RMF_NAME, RCL_CLIENT,
359                              op_data->op_namelen + 1);
360
361         /* get SELinux policy info if any */
362         sepol = sptlrpc_sepol_get(req);
363         if (IS_ERR(sepol))
364                 GOTO(err_free_rq, rc = PTR_ERR(sepol));
365
366         req_capsule_set_size(&req->rq_pill, &RMF_SELINUX_POL, RCL_CLIENT,
367                              sptlrpc_sepol_size(sepol));
368
369         rc = mdc_prep_elc_req(exp, req, MDS_REINT, &cancels, count);
370         if (rc)
371                 GOTO(err_put_sepol, rc);
372
373         mdc_unlink_pack(&req->rq_pill, op_data, sepol);
374         sptlrpc_sepol_put(sepol);
375
376         req_capsule_set_size(&req->rq_pill, &RMF_MDT_MD, RCL_SERVER,
377                              obd->u.cli.cl_default_mds_easize);
378         ptlrpc_request_set_replen(req);
379
380         *request = req;
381
382         rc = mdc_reint(req, LUSTRE_IMP_FULL);
383         if (rc == -ERESTARTSYS)
384                 rc = 0;
385
386         RETURN(rc);
387
388 err_put_sepol:
389         sptlrpc_sepol_put(sepol);
390 err_free_rq:
391         ptlrpc_request_free(req);
392
393         RETURN(rc);
394 }
395
396 int mdc_link(struct obd_export *exp, struct md_op_data *op_data,
397              struct ptlrpc_request **request)
398 {
399         LIST_HEAD(cancels);
400         struct ptlrpc_request *req;
401         struct sptlrpc_sepol *sepol;
402         int count = 0, rc;
403         ENTRY;
404
405         if ((op_data->op_flags & MF_MDC_CANCEL_FID2) &&
406             (fid_is_sane(&op_data->op_fid2)))
407                 count = mdc_resource_get_unused(exp, &op_data->op_fid2,
408                                                 &cancels, LCK_EX,
409                                                 MDS_INODELOCK_UPDATE);
410         if ((op_data->op_flags & MF_MDC_CANCEL_FID1) &&
411             (fid_is_sane(&op_data->op_fid1)))
412                 count += mdc_resource_get_unused(exp, &op_data->op_fid1,
413                                                  &cancels, LCK_EX,
414                                                  MDS_INODELOCK_UPDATE);
415
416         req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_MDS_REINT_LINK);
417         if (req == NULL) {
418                 ldlm_lock_list_put(&cancels, l_bl_ast, count);
419                 RETURN(-ENOMEM);
420         }
421
422         req_capsule_set_size(&req->rq_pill, &RMF_NAME, RCL_CLIENT,
423                              op_data->op_namelen + 1);
424
425         /* get SELinux policy info if any */
426         sepol = sptlrpc_sepol_get(req);
427         if (IS_ERR(sepol))
428                 GOTO(err_free_rq, rc = PTR_ERR(sepol));
429
430         req_capsule_set_size(&req->rq_pill, &RMF_SELINUX_POL, RCL_CLIENT,
431                              sptlrpc_sepol_size(sepol));
432
433         rc = mdc_prep_elc_req(exp, req, MDS_REINT, &cancels, count);
434         if (rc)
435                 GOTO(err_put_sepol, rc);
436
437         mdc_link_pack(&req->rq_pill, op_data, sepol);
438         sptlrpc_sepol_put(sepol);
439
440         ptlrpc_request_set_replen(req);
441
442         rc = mdc_reint(req, LUSTRE_IMP_FULL);
443         *request = req;
444         if (rc == -ERESTARTSYS)
445                 rc = 0;
446
447         RETURN(rc);
448
449 err_put_sepol:
450         sptlrpc_sepol_put(sepol);
451 err_free_rq:
452         ptlrpc_request_free(req);
453
454         RETURN(rc);
455 }
456
457 int mdc_rename(struct obd_export *exp, struct md_op_data *op_data,
458                 const char *old, size_t oldlen, const char *new, size_t newlen,
459                 struct ptlrpc_request **request)
460 {
461         LIST_HEAD(cancels);
462         struct obd_device *obd = exp->exp_obd;
463         struct ptlrpc_request *req;
464         struct sptlrpc_sepol *sepol;
465         int count = 0, rc;
466
467         ENTRY;
468
469         if ((op_data->op_flags & MF_MDC_CANCEL_FID1) &&
470             (fid_is_sane(&op_data->op_fid1)))
471                 count = mdc_resource_get_unused(exp, &op_data->op_fid1,
472                                                 &cancels, LCK_EX,
473                                                 MDS_INODELOCK_UPDATE);
474         if ((op_data->op_flags & MF_MDC_CANCEL_FID2) &&
475             (fid_is_sane(&op_data->op_fid2)))
476                 count += mdc_resource_get_unused(exp, &op_data->op_fid2,
477                                                  &cancels, LCK_EX,
478                                                  MDS_INODELOCK_UPDATE);
479         if ((op_data->op_flags & MF_MDC_CANCEL_FID3) &&
480             (fid_is_sane(&op_data->op_fid3)))
481                 count += mdc_resource_get_unused(exp, &op_data->op_fid3,
482                                                  &cancels, LCK_EX,
483                                                  MDS_INODELOCK_LOOKUP);
484         if ((op_data->op_flags & MF_MDC_CANCEL_FID4) &&
485             (fid_is_sane(&op_data->op_fid4)))
486                 count += mdc_resource_get_unused(exp, &op_data->op_fid4,
487                                                  &cancels, LCK_EX,
488                                                  MDS_INODELOCK_ELC);
489
490         req = ptlrpc_request_alloc(class_exp2cliimp(exp),
491                            op_data->op_cli_flags & CLI_MIGRATE ?
492                            &RQF_MDS_REINT_MIGRATE : &RQF_MDS_REINT_RENAME);
493         if (req == NULL) {
494                 ldlm_lock_list_put(&cancels, l_bl_ast, count);
495                 RETURN(-ENOMEM);
496         }
497
498         req_capsule_set_size(&req->rq_pill, &RMF_NAME, RCL_CLIENT, oldlen + 1);
499         req_capsule_set_size(&req->rq_pill, &RMF_SYMTGT, RCL_CLIENT, newlen+1);
500         if (op_data->op_cli_flags & CLI_MIGRATE)
501                 req_capsule_set_size(&req->rq_pill, &RMF_EADATA, RCL_CLIENT,
502                                      op_data->op_data_size);
503
504         /* get SELinux policy info if any */
505         sepol = sptlrpc_sepol_get(req);
506         if (IS_ERR(sepol))
507                 GOTO(err_free_rq, rc = PTR_ERR(sepol));
508
509         req_capsule_set_size(&req->rq_pill, &RMF_SELINUX_POL, RCL_CLIENT,
510                              sptlrpc_sepol_size(sepol));
511
512         rc = mdc_prep_elc_req(exp, req, MDS_REINT, &cancels, count);
513         if (rc)
514                 GOTO(err_put_sepol, rc);
515
516         if (exp_connect_cancelset(exp) && req)
517                 ldlm_cli_cancel_list(&cancels, count, req, 0);
518
519         if (op_data->op_cli_flags & CLI_MIGRATE)
520                 mdc_migrate_pack(&req->rq_pill, op_data, old, oldlen);
521         else
522                 mdc_rename_pack(&req->rq_pill, op_data, old, oldlen,
523                                 new, newlen, sepol);
524
525         sptlrpc_sepol_put(sepol);
526
527         /* LU-17441: avoid blocking MDS_REQUEST_PORTAL for renames with BFL */
528 #if LUSTRE_VERSION_CODE < OBD_OCD_VERSION(2, 20, 53, 0)
529         /* MDS_IO_PORTAL available since v2_10_53_0-33-g2bcc5ad0ed */
530         if ((exp_connect_flags(exp) &
531              (OBD_CONNECT_GRANT | OBD_CONNECT_SRVLOCK)) ==
532             (OBD_CONNECT_GRANT | OBD_CONNECT_SRVLOCK))
533 #endif
534                 req->rq_request_portal = MDS_IO_PORTAL;
535         req_capsule_set_size(&req->rq_pill, &RMF_MDT_MD, RCL_SERVER,
536                              obd->u.cli.cl_default_mds_easize);
537         ptlrpc_request_set_replen(req);
538
539         rc = mdc_reint(req, LUSTRE_IMP_FULL);
540         *request = req;
541         if (rc == -ERESTARTSYS)
542                 rc = 0;
543
544         RETURN(rc);
545
546 err_put_sepol:
547         sptlrpc_sepol_put(sepol);
548 err_free_rq:
549         ptlrpc_request_free(req);
550
551         RETURN(rc);
552 }
553
554 int mdc_file_resync(struct obd_export *exp, struct md_op_data *op_data)
555 {
556         LIST_HEAD(cancels);
557         struct ptlrpc_request *req;
558         struct ldlm_lock *lock;
559         struct mdt_rec_resync *rec;
560         int count = 0, rc;
561         ENTRY;
562
563         if (op_data->op_flags & MF_MDC_CANCEL_FID1 &&
564             fid_is_sane(&op_data->op_fid1))
565                 count = mdc_resource_get_unused(exp, &op_data->op_fid1,
566                                                 &cancels, LCK_EX,
567                                                 MDS_INODELOCK_LAYOUT);
568
569         req = ptlrpc_request_alloc(class_exp2cliimp(exp),
570                                    &RQF_MDS_REINT_RESYNC);
571         if (req == NULL) {
572                 ldlm_lock_list_put(&cancels, l_bl_ast, count);
573                 RETURN(-ENOMEM);
574         }
575
576         rc = mdc_prep_elc_req(exp, req, MDS_REINT, &cancels, count);
577         if (rc) {
578                 ptlrpc_request_free(req);
579                 RETURN(rc);
580         }
581
582         BUILD_BUG_ON(sizeof(*rec) != sizeof(struct mdt_rec_reint));
583         rec = req_capsule_client_get(&req->rq_pill, &RMF_REC_REINT);
584         rec->rs_opcode  = REINT_RESYNC;
585         rec->rs_fsuid   = op_data->op_fsuid;
586         rec->rs_fsgid   = op_data->op_fsgid;
587         rec->rs_cap     = ll_capability_u32(op_data->op_cap);
588         rec->rs_fid     = op_data->op_fid1;
589         rec->rs_bias    = op_data->op_bias;
590         rec->rs_mirror_id = op_data->op_mirror_id;
591
592         lock = ldlm_handle2lock(&op_data->op_lease_handle);
593         if (lock != NULL) {
594                 rec->rs_lease_handle = lock->l_remote_handle;
595                 LDLM_LOCK_PUT(lock);
596         }
597
598         ptlrpc_request_set_replen(req);
599
600         rc = mdc_reint(req, LUSTRE_IMP_FULL);
601         if (rc == -ERESTARTSYS)
602                 rc = 0;
603
604         ptlrpc_req_finished(req);
605         RETURN(rc);
606 }