Whamcloud - gitweb
LU-19098 hsm: don't print progname twice with lhsmtool
[fs/lustre-release.git] / lustre / mdc / mdc_reint.c
1 // SPDX-License-Identifier: GPL-2.0
2
3 /*
4  * Copyright (c) 2002, 2010, Oracle and/or its affiliates. All rights reserved.
5  * Use is subject to license terms.
6  *
7  * Copyright (c) 2012, 2017, Intel Corporation.
8  */
9
10 /*
11  * This file is part of Lustre, http://www.lustre.org/
12  */
13
14 #define DEBUG_SUBSYSTEM S_MDC
15
16 #include <linux/module.h>
17 #include <linux/kernel.h>
18
19 #include <obd_class.h>
20 #include "mdc_internal.h"
21 #include <lustre_fid.h>
22
23 /* mdc_setattr does its own semaphore handling */
24 static int mdc_reint(struct ptlrpc_request *request, int level)
25 {
26         int rc;
27
28         request->rq_send_state = level;
29
30         ptlrpc_get_mod_rpc_slot(request);
31         rc = ptlrpc_queue_wait(request);
32         ptlrpc_put_mod_rpc_slot(request);
33         if (rc)
34                 CDEBUG(D_INFO, "error in handling %d\n", rc);
35         else if (!req_capsule_server_get(&request->rq_pill, &RMF_MDT_BODY))
36                 rc = -EPROTO;
37         return rc;
38 }
39
40 /* Find and cancel locally locks matched by inode @bits & @mode in the resource
41  * found by @fid. Found locks are added into @cancel list. Returns the amount of
42  * locks added to @cancels list.
43  */
44 int mdc_resource_cancel_unused_res(struct obd_export *exp,
45                                    struct ldlm_res_id *res_id,
46                                    struct list_head *cancels,
47                                    enum ldlm_mode mode, __u64 bits)
48 {
49         struct ldlm_namespace *ns = exp->exp_obd->obd_namespace;
50         union ldlm_policy_data policy = { { 0 } };
51         struct ldlm_resource *res;
52         int count;
53
54         ENTRY;
55         /* Return, i.e. cancel nothing, only if ELC is supported (flag in
56          * export) but disabled through procfs (flag in NS).
57          *
58          * This distinguishes from a case when ELC is not supported originally,
59          * when we still want to cancel locks in advance and just cancel them
60          * locally, without sending any RPC.
61          */
62         if (exp_connect_cancelset(exp) && !ns_connect_cancelset(ns))
63                 RETURN(0);
64
65         res = ldlm_resource_get(ns, res_id, 0, 0);
66         if (IS_ERR(res))
67                 RETURN(0);
68         /* Initialize ibits lock policy. */
69         policy.l_inodebits.bits = bits;
70         count = ldlm_cancel_resource_local(res, cancels, &policy, mode, 0, 0,
71                                            NULL);
72         ldlm_resource_putref(res);
73         RETURN(count);
74 }
75
76 int mdc_resource_cancel_unused(struct obd_export *exp, const struct lu_fid *fid,
77                                struct list_head *cancels, enum ldlm_mode mode,
78                                __u64 bits)
79 {
80         struct ldlm_res_id res_id;
81
82         fid_build_reg_res_name(fid, &res_id);
83         return mdc_resource_cancel_unused_res(exp, &res_id, cancels, mode, bits);
84 }
85
86 int mdc_setattr(struct obd_export *exp, struct md_op_data *op_data,
87                 void *ea, size_t ealen, struct ptlrpc_request **request)
88 {
89         LIST_HEAD(cancels);
90         struct ptlrpc_request *req;
91         int count = 0, rc;
92         __u64 bits;
93
94         ENTRY;
95         LASSERT(op_data != NULL);
96
97         bits = MDS_INODELOCK_UPDATE;
98         if (op_data->op_attr.ia_valid & (ATTR_MODE|ATTR_UID|ATTR_GID))
99                 bits |= MDS_INODELOCK_LOOKUP;
100         if ((op_data->op_flags & MF_MDC_CANCEL_FID1) &&
101             (fid_is_sane(&op_data->op_fid1)))
102                 count = mdc_resource_cancel_unused(exp, &op_data->op_fid1,
103                                                    &cancels, LCK_EX, bits);
104         req = ptlrpc_request_alloc(class_exp2cliimp(exp),
105                                    &RQF_MDS_REINT_SETATTR);
106         if (req == NULL) {
107                 ldlm_lock_list_put(&cancels, l_bl_ast, count);
108                 RETURN(-ENOMEM);
109         }
110
111         req_capsule_set_size(&req->rq_pill, &RMF_MDT_EPOCH, RCL_CLIENT, 0);
112         req_capsule_set_size(&req->rq_pill, &RMF_EADATA, RCL_CLIENT, ealen);
113         req_capsule_set_size(&req->rq_pill, &RMF_LOGCOOKIES, RCL_CLIENT, 0);
114
115         rc = mdc_prep_elc_req(exp, req, MDS_REINT, &cancels, count);
116         if (rc) {
117                 ptlrpc_request_free(req);
118                 RETURN(rc);
119         }
120
121         if (op_data->op_attr.ia_valid & (ATTR_MTIME | ATTR_CTIME))
122                 CDEBUG(D_INODE, "setting mtime %lld, ctime %lld\n",
123                        (s64)op_data->op_attr.ia_mtime.tv_sec,
124                        (s64)op_data->op_attr.ia_ctime.tv_sec);
125         mdc_setattr_pack(&req->rq_pill, op_data, ea, ealen);
126
127         req_capsule_set_size(&req->rq_pill, &RMF_ACL, RCL_SERVER, 0);
128
129         ptlrpc_request_set_replen(req);
130
131         rc = mdc_reint(req, LUSTRE_IMP_FULL);
132         if (rc == -ERESTARTSYS)
133                 rc = 0;
134
135         *request = req;
136
137         RETURN(rc);
138 }
139
140 int mdc_create(struct obd_export *exp, struct md_op_data *op_data,
141                 const void *data, size_t datalen,
142                 umode_t mode, uid_t uid, gid_t gid,
143                 kernel_cap_t cap_effective, __u64 rdev,
144                 struct ptlrpc_request **request)
145 {
146         struct ptlrpc_request *req;
147         struct sptlrpc_sepol *sepol;
148         int level, rc;
149         int count, resends = 0;
150         struct obd_import *import = exp->exp_obd->u.cli.cl_import;
151         int generation = import->imp_generation;
152         LIST_HEAD(cancels);
153
154         ENTRY;
155         /* For case if upper layer did not alloc fid, do it now. */
156         if (!fid_is_sane(&op_data->op_fid2)) {
157                 /*
158                  * mdc_fid_alloc() may return errno 1 in case of switch to new
159                  * sequence, handle this.
160                  */
161                 rc = mdc_fid_alloc(NULL, exp, &op_data->op_fid2, op_data);
162                 if (rc < 0)
163                         RETURN(rc);
164         }
165
166 rebuild:
167         count = 0;
168         if ((op_data->op_flags & MF_MDC_CANCEL_FID1) &&
169             (fid_is_sane(&op_data->op_fid1)))
170                 count = mdc_resource_cancel_unused(exp, &op_data->op_fid1,
171                                                    &cancels, LCK_EX,
172                                                    MDS_INODELOCK_UPDATE);
173
174         req = ptlrpc_request_alloc(class_exp2cliimp(exp),
175                                    &RQF_MDS_REINT_CREATE_ACL);
176         if (req == NULL) {
177                 ldlm_lock_list_put(&cancels, l_bl_ast, count);
178                 RETURN(-ENOMEM);
179         }
180
181         req_capsule_set_size(&req->rq_pill, &RMF_NAME, RCL_CLIENT,
182                              op_data->op_namelen + 1);
183         req_capsule_set_size(&req->rq_pill, &RMF_EADATA, RCL_CLIENT,
184                              data && datalen ? datalen : 0);
185
186         req_capsule_set_size(&req->rq_pill, &RMF_FILE_SECCTX_NAME,
187                              RCL_CLIENT, op_data->op_file_secctx_name != NULL ?
188                              strlen(op_data->op_file_secctx_name) + 1 : 0);
189
190         req_capsule_set_size(&req->rq_pill, &RMF_FILE_SECCTX, RCL_CLIENT,
191                              op_data->op_file_secctx_size);
192
193         req_capsule_set_size(&req->rq_pill, &RMF_FILE_ENCCTX, RCL_CLIENT,
194                              op_data->op_file_encctx_size);
195
196         /* get SELinux policy info if any */
197         sepol = sptlrpc_sepol_get(req);
198         if (IS_ERR(sepol))
199                 GOTO(err_free_rq, rc = PTR_ERR(sepol));
200
201         req_capsule_set_size(&req->rq_pill, &RMF_SELINUX_POL, RCL_CLIENT,
202                              sptlrpc_sepol_size(sepol));
203
204         rc = mdc_prep_elc_req(exp, req, MDS_REINT, &cancels, count);
205         if (rc)
206                 GOTO(err_put_sepol, rc);
207
208         /*
209          * mdc_create_pack() fills msg->bufs[1] with name and msg->bufs[2] with
210          * tgt, for symlinks or lov MD data.
211          */
212         mdc_create_pack(&req->rq_pill, op_data, data, datalen, mode, uid,
213                         gid, cap_effective, rdev, sepol);
214
215         sptlrpc_sepol_put(sepol);
216
217         req_capsule_set_size(&req->rq_pill, &RMF_MDT_MD, RCL_SERVER,
218                              exp->exp_obd->u.cli.cl_default_mds_easize);
219         ptlrpc_request_set_replen(req);
220
221         /* ask ptlrpc not to resend on EINPROGRESS since we have our own retry
222          * logic here
223          */
224         req->rq_no_retry_einprogress = 1;
225
226         if (resends) {
227                 req->rq_generation_set = 1;
228                 req->rq_import_generation = generation;
229                 req->rq_sent = ktime_get_real_seconds() + resends;
230         }
231         level = LUSTRE_IMP_FULL;
232  resend:
233         rc = mdc_reint(req, level);
234
235         /* Resend if we were told to. */
236         if (rc == -ERESTARTSYS) {
237                 level = LUSTRE_IMP_RECOVER;
238                 goto resend;
239         } else if (rc == -EINPROGRESS) {
240                 /* Retry create infinitely until succeed or get other
241                  * error code or interrupted.
242                  */
243                 ptlrpc_req_put(req);
244                 if (generation == import->imp_generation) {
245                         if (signal_pending(current))
246                                 RETURN(-EINTR);
247
248                         resends++;
249                         CDEBUG(D_HA, "%s: resend:%d create on "DFID"/"DFID"\n",
250                                exp->exp_obd->obd_name, resends,
251                                PFID(&op_data->op_fid1),
252                                PFID(&op_data->op_fid2));
253                         goto rebuild;
254                 } else {
255                         CDEBUG(D_HA, "resend cross eviction\n");
256                         RETURN(-EIO);
257                 }
258         } else if (rc == 0 && S_ISDIR(mode)) {
259                 struct mdt_body *body;
260
261                 body = req_capsule_server_get(&req->rq_pill, &RMF_MDT_BODY);
262                 if (body == NULL) {
263                         rc = -EPROTO;
264                         CERROR("%s: cannot swab mdt_body: rc = %d\n",
265                                exp->exp_obd->obd_name, rc);
266                         RETURN(rc);
267                 }
268
269                 if ((body->mbo_valid & (OBD_MD_FLDIREA | OBD_MD_MEA)) ==
270                     (OBD_MD_FLDIREA | OBD_MD_MEA)) {
271                         void *eadata;
272
273                         /* clear valid, because mkdir doesn't need to initialize
274                          * LMV, which will be delayed to lookup.
275                          */
276                         body->mbo_valid &= ~(OBD_MD_FLDIREA | OBD_MD_MEA);
277                         mdc_update_max_ea_from_body(exp, body);
278                         /* The eadata is opaque; just check that it is there.
279                          * Eventually, obd_unpackmd() will check the contents.
280                          */
281                         eadata = req_capsule_server_sized_get(&req->rq_pill,
282                                                           &RMF_MDT_MD,
283                                                           body->mbo_eadatasize);
284                         if (eadata == NULL)
285                                 RETURN(-EPROTO);
286
287                         /* save the reply LMV EA in case we have to replay a
288                          * create for recovery.
289                          */
290                         rc = mdc_save_lmm(req, eadata, body->mbo_eadatasize);
291                 }
292         }
293
294         *request = req;
295
296         RETURN(rc);
297
298 err_put_sepol:
299         sptlrpc_sepol_put(sepol);
300 err_free_rq:
301         ptlrpc_request_free(req);
302
303         RETURN(rc);
304 }
305
306 int mdc_unlink(struct obd_export *exp, struct md_op_data *op_data,
307                struct ptlrpc_request **request)
308 {
309         LIST_HEAD(cancels);
310         struct obd_device *obd = class_exp2obd(exp);
311         struct ptlrpc_request *req = *request;
312         struct sptlrpc_sepol *sepol;
313         int count = 0, rc;
314
315         ENTRY;
316         LASSERT(req == NULL);
317
318         if ((op_data->op_flags & MF_MDC_CANCEL_FID1) &&
319             (fid_is_sane(&op_data->op_fid1)))
320                 count = mdc_resource_cancel_unused(exp, &op_data->op_fid1,
321                                                    &cancels, LCK_EX,
322                                                    MDS_INODELOCK_UPDATE);
323         if ((op_data->op_flags & MF_MDC_CANCEL_FID3) &&
324             (fid_is_sane(&op_data->op_fid3)))
325                 /* cancel DOM lock only if it has no data to flush */
326                 count += mdc_resource_cancel_unused(exp, &op_data->op_fid3,
327                                                     &cancels, LCK_EX,
328                                                     op_data->op_cli_flags &
329                                                     CLI_DIRTY_DATA ?
330                                                     MDS_INODELOCK_ELC :
331                                                     MDS_INODELOCK_FULL);
332         req = ptlrpc_request_alloc(class_exp2cliimp(exp),
333                                    &RQF_MDS_REINT_UNLINK);
334         if (req == NULL) {
335                 ldlm_lock_list_put(&cancels, l_bl_ast, count);
336                 RETURN(-ENOMEM);
337         }
338
339         req_capsule_set_size(&req->rq_pill, &RMF_NAME, RCL_CLIENT,
340                              op_data->op_namelen + 1);
341
342         /* get SELinux policy info if any */
343         sepol = sptlrpc_sepol_get(req);
344         if (IS_ERR(sepol))
345                 GOTO(err_free_rq, rc = PTR_ERR(sepol));
346
347         req_capsule_set_size(&req->rq_pill, &RMF_SELINUX_POL, RCL_CLIENT,
348                              sptlrpc_sepol_size(sepol));
349
350         rc = mdc_prep_elc_req(exp, req, MDS_REINT, &cancels, count);
351         if (rc)
352                 GOTO(err_put_sepol, rc);
353
354         mdc_unlink_pack(&req->rq_pill, op_data, sepol);
355         sptlrpc_sepol_put(sepol);
356
357         req_capsule_set_size(&req->rq_pill, &RMF_MDT_MD, RCL_SERVER,
358                              obd->u.cli.cl_default_mds_easize);
359         ptlrpc_request_set_replen(req);
360
361         *request = req;
362
363         rc = mdc_reint(req, LUSTRE_IMP_FULL);
364         if (rc == -ERESTARTSYS)
365                 rc = 0;
366
367         RETURN(rc);
368
369 err_put_sepol:
370         sptlrpc_sepol_put(sepol);
371 err_free_rq:
372         ptlrpc_request_free(req);
373
374         RETURN(rc);
375 }
376
377 int mdc_link(struct obd_export *exp, struct md_op_data *op_data,
378              struct ptlrpc_request **request)
379 {
380         LIST_HEAD(cancels);
381         struct ptlrpc_request *req;
382         struct sptlrpc_sepol *sepol;
383         int count = 0, rc;
384
385         ENTRY;
386         if ((op_data->op_flags & MF_MDC_CANCEL_FID2) &&
387             (fid_is_sane(&op_data->op_fid2)))
388                 count = mdc_resource_cancel_unused(exp, &op_data->op_fid2,
389                                                    &cancels, LCK_EX,
390                                                    MDS_INODELOCK_UPDATE);
391         if ((op_data->op_flags & MF_MDC_CANCEL_FID1) &&
392             (fid_is_sane(&op_data->op_fid1)))
393                 count += mdc_resource_cancel_unused(exp, &op_data->op_fid1,
394                                                     &cancels, LCK_EX,
395                                                     MDS_INODELOCK_UPDATE);
396
397         req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_MDS_REINT_LINK);
398         if (req == NULL) {
399                 ldlm_lock_list_put(&cancels, l_bl_ast, count);
400                 RETURN(-ENOMEM);
401         }
402
403         req_capsule_set_size(&req->rq_pill, &RMF_NAME, RCL_CLIENT,
404                              op_data->op_namelen + 1);
405
406         /* get SELinux policy info if any */
407         sepol = sptlrpc_sepol_get(req);
408         if (IS_ERR(sepol))
409                 GOTO(err_free_rq, rc = PTR_ERR(sepol));
410
411         req_capsule_set_size(&req->rq_pill, &RMF_SELINUX_POL, RCL_CLIENT,
412                              sptlrpc_sepol_size(sepol));
413
414         rc = mdc_prep_elc_req(exp, req, MDS_REINT, &cancels, count);
415         if (rc)
416                 GOTO(err_put_sepol, rc);
417
418         mdc_link_pack(&req->rq_pill, op_data, sepol);
419         sptlrpc_sepol_put(sepol);
420
421         ptlrpc_request_set_replen(req);
422
423         rc = mdc_reint(req, LUSTRE_IMP_FULL);
424         *request = req;
425         if (rc == -ERESTARTSYS)
426                 rc = 0;
427
428         RETURN(rc);
429
430 err_put_sepol:
431         sptlrpc_sepol_put(sepol);
432 err_free_rq:
433         ptlrpc_request_free(req);
434
435         RETURN(rc);
436 }
437
438 int mdc_rename(struct obd_export *exp, struct md_op_data *op_data,
439                 const char *old, size_t oldlen, const char *new, size_t newlen,
440                 struct ptlrpc_request **request)
441 {
442         LIST_HEAD(cancels);
443         struct obd_device *obd = exp->exp_obd;
444         struct ptlrpc_request *req;
445         struct sptlrpc_sepol *sepol;
446         int count = 0, rc;
447
448         ENTRY;
449         if ((op_data->op_flags & MF_MDC_CANCEL_FID1) &&
450             (fid_is_sane(&op_data->op_fid1)))
451                 count = mdc_resource_cancel_unused(exp, &op_data->op_fid1,
452                                                    &cancels, LCK_EX,
453                                                    MDS_INODELOCK_UPDATE);
454         if ((op_data->op_flags & MF_MDC_CANCEL_FID2) &&
455             (fid_is_sane(&op_data->op_fid2)))
456                 count += mdc_resource_cancel_unused(exp, &op_data->op_fid2,
457                                                     &cancels, LCK_EX,
458                                                     MDS_INODELOCK_UPDATE);
459         if ((op_data->op_flags & MF_MDC_CANCEL_FID3) &&
460             (fid_is_sane(&op_data->op_fid3)))
461                 count += mdc_resource_cancel_unused(exp, &op_data->op_fid3,
462                                                     &cancels, LCK_EX,
463                                                     MDS_INODELOCK_LOOKUP);
464         if ((op_data->op_flags & MF_MDC_CANCEL_FID4) &&
465             (fid_is_sane(&op_data->op_fid4)))
466                 count += mdc_resource_cancel_unused(exp, &op_data->op_fid4,
467                                                     &cancels, LCK_EX,
468                                                     MDS_INODELOCK_ELC);
469
470         req = ptlrpc_request_alloc(class_exp2cliimp(exp),
471                            op_data->op_cli_flags & CLI_MIGRATE ?
472                            &RQF_MDS_REINT_MIGRATE : &RQF_MDS_REINT_RENAME);
473         if (req == NULL) {
474                 ldlm_lock_list_put(&cancels, l_bl_ast, count);
475                 RETURN(-ENOMEM);
476         }
477
478         req_capsule_set_size(&req->rq_pill, &RMF_NAME, RCL_CLIENT, oldlen + 1);
479         req_capsule_set_size(&req->rq_pill, &RMF_SYMTGT, RCL_CLIENT, newlen+1);
480         if (op_data->op_cli_flags & CLI_MIGRATE)
481                 req_capsule_set_size(&req->rq_pill, &RMF_EADATA, RCL_CLIENT,
482                                      op_data->op_data_size);
483
484         /* get SELinux policy info if any */
485         sepol = sptlrpc_sepol_get(req);
486         if (IS_ERR(sepol))
487                 GOTO(err_free_rq, rc = PTR_ERR(sepol));
488
489         req_capsule_set_size(&req->rq_pill, &RMF_SELINUX_POL, RCL_CLIENT,
490                              sptlrpc_sepol_size(sepol));
491
492         rc = mdc_prep_elc_req(exp, req, MDS_REINT, &cancels, count);
493         if (rc)
494                 GOTO(err_put_sepol, rc);
495
496         if (op_data->op_cli_flags & CLI_MIGRATE)
497                 mdc_migrate_pack(&req->rq_pill, op_data, old, oldlen);
498         else
499                 mdc_rename_pack(&req->rq_pill, op_data, old, oldlen,
500                                 new, newlen, sepol);
501
502         sptlrpc_sepol_put(sepol);
503
504         /* LU-17441: avoid blocking MDS_REQUEST_PORTAL for renames with BFL */
505 #if LUSTRE_VERSION_CODE < OBD_OCD_VERSION(2, 20, 53, 0)
506         /* MDS_IO_PORTAL available since v2_10_53_0-33-g2bcc5ad0ed */
507         if ((exp_connect_flags(exp) &
508              (OBD_CONNECT_GRANT | OBD_CONNECT_SRVLOCK)) ==
509             (OBD_CONNECT_GRANT | OBD_CONNECT_SRVLOCK))
510 #endif
511                 req->rq_request_portal = MDS_IO_PORTAL;
512         req_capsule_set_size(&req->rq_pill, &RMF_MDT_MD, RCL_SERVER,
513                              obd->u.cli.cl_default_mds_easize);
514         ptlrpc_request_set_replen(req);
515
516         rc = mdc_reint(req, LUSTRE_IMP_FULL);
517         *request = req;
518         if (rc == -ERESTARTSYS)
519                 rc = 0;
520
521         RETURN(rc);
522
523 err_put_sepol:
524         sptlrpc_sepol_put(sepol);
525 err_free_rq:
526         ptlrpc_request_free(req);
527
528         RETURN(rc);
529 }
530
531 int mdc_file_resync(struct obd_export *exp, struct md_op_data *op_data)
532 {
533         LIST_HEAD(cancels);
534         struct ptlrpc_request *req;
535         struct ldlm_lock *lock;
536         struct mdt_rec_resync *rec;
537         int count = 0, rc;
538
539         ENTRY;
540         if (op_data->op_flags & MF_MDC_CANCEL_FID1 &&
541             fid_is_sane(&op_data->op_fid1))
542                 count = mdc_resource_cancel_unused(exp, &op_data->op_fid1,
543                                                    &cancels, LCK_EX,
544                                                    MDS_INODELOCK_LAYOUT);
545
546         req = ptlrpc_request_alloc(class_exp2cliimp(exp),
547                                    &RQF_MDS_REINT_RESYNC);
548         if (req == NULL) {
549                 ldlm_lock_list_put(&cancels, l_bl_ast, count);
550                 RETURN(-ENOMEM);
551         }
552
553         rc = mdc_prep_elc_req(exp, req, MDS_REINT, &cancels, count);
554         if (rc) {
555                 ptlrpc_request_free(req);
556                 RETURN(rc);
557         }
558
559         BUILD_BUG_ON(sizeof(*rec) != sizeof(struct mdt_rec_reint));
560         rec = req_capsule_client_get(&req->rq_pill, &RMF_REC_REINT);
561         rec->rs_opcode  = REINT_RESYNC;
562         rec->rs_fsuid   = op_data->op_fsuid;
563         rec->rs_fsgid   = op_data->op_fsgid;
564         rec->rs_cap     = ll_capability_u32(op_data->op_cap);
565         rec->rs_fid     = op_data->op_fid1;
566         rec->rs_bias    = op_data->op_bias;
567         if (exp_connect_mirror_id_fix(exp))
568                 rec->rs_mirror_id_new = op_data->op_mirror_id;
569         else
570                 rec->rs_mirror_id_old = op_data->op_mirror_id;
571
572         lock = ldlm_handle2lock(&op_data->op_lease_handle);
573         if (lock != NULL) {
574                 rec->rs_lease_handle = lock->l_remote_handle;
575                 ldlm_lock_put(lock);
576         }
577
578         ptlrpc_request_set_replen(req);
579
580         rc = mdc_reint(req, LUSTRE_IMP_FULL);
581         if (rc == -ERESTARTSYS)
582                 rc = 0;
583
584         ptlrpc_req_put(req);
585         RETURN(rc);
586 }