Whamcloud - gitweb
LU-7433 ldlm: xattr locks are lost on mdt
[fs/lustre-release.git] / lustre / lmv / lmv_obd.c
1 /*
2  * GPL HEADER START
3  *
4  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
5  *
6  * This program is free software; you can redistribute it and/or modify
7  * it under the terms of the GNU General Public License version 2 only,
8  * as published by the Free Software Foundation.
9  *
10  * This program is distributed in the hope that it will be useful, but
11  * WITHOUT ANY WARRANTY; without even the implied warranty of
12  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13  * General Public License version 2 for more details (a copy is included
14  * in the LICENSE file that accompanied this code).
15  *
16  * You should have received a copy of the GNU General Public License
17  * version 2 along with this program; If not, see
18  * http://www.sun.com/software/products/lustre/docs/GPLv2.pdf
19  *
20  * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara,
21  * CA 95054 USA or visit www.sun.com if you need additional information or
22  * have any questions.
23  *
24  * GPL HEADER END
25  */
26 /*
27  * Copyright (c) 2004, 2010, Oracle and/or its affiliates. All rights reserved.
28  * Use is subject to license terms.
29  *
30  * Copyright (c) 2011, 2015, Intel Corporation.
31  */
32 /*
33  * This file is part of Lustre, http://www.lustre.org/
34  * Lustre is a trademark of Sun Microsystems, Inc.
35  */
36
37 #define DEBUG_SUBSYSTEM S_LMV
38 #include <linux/slab.h>
39 #include <linux/module.h>
40 #include <linux/init.h>
41 #include <linux/user_namespace.h>
42 #ifdef HAVE_UIDGID_HEADER
43 # include <linux/uidgid.h>
44 #endif
45 #include <linux/slab.h>
46 #include <linux/pagemap.h>
47 #include <linux/mm.h>
48 #include <linux/math64.h>
49 #include <linux/seq_file.h>
50 #include <linux/namei.h>
51
52 #include <lustre/lustre_idl.h>
53 #include <obd_support.h>
54 #include <lustre_lib.h>
55 #include <lustre_net.h>
56 #include <obd_class.h>
57 #include <lustre_lmv.h>
58 #include <lprocfs_status.h>
59 #include <cl_object.h>
60 #include <lustre_fid.h>
61 #include <lustre_ioctl.h>
62 #include <lustre_kernelcomm.h>
63 #include "lmv_internal.h"
64
65 static int lmv_check_connect(struct obd_device *obd);
66
67 static void lmv_activate_target(struct lmv_obd *lmv,
68                                 struct lmv_tgt_desc *tgt,
69                                 int activate)
70 {
71         if (tgt->ltd_active == activate)
72                 return;
73
74         tgt->ltd_active = activate;
75         lmv->desc.ld_active_tgt_count += (activate ? 1 : -1);
76
77         tgt->ltd_exp->exp_obd->obd_inactive = !activate;
78 }
79
80 /**
81  * Error codes:
82  *
83  *  -EINVAL  : UUID can't be found in the LMV's target list
84  *  -ENOTCONN: The UUID is found, but the target connection is bad (!)
85  *  -EBADF   : The UUID is found, but the OBD of the wrong type (!)
86  */
87 static int lmv_set_mdc_active(struct lmv_obd *lmv,
88                               const struct obd_uuid *uuid,
89                               int activate)
90 {
91         struct lmv_tgt_desc     *tgt = NULL;
92         struct obd_device       *obd;
93         __u32                    i;
94         int                      rc = 0;
95         ENTRY;
96
97         CDEBUG(D_INFO, "Searching in lmv %p for uuid %s (activate=%d)\n",
98                         lmv, uuid->uuid, activate);
99
100         spin_lock(&lmv->lmv_lock);
101         for (i = 0; i < lmv->desc.ld_tgt_count; i++) {
102                 tgt = lmv->tgts[i];
103                 if (tgt == NULL || tgt->ltd_exp == NULL)
104                         continue;
105
106                 CDEBUG(D_INFO, "Target idx %d is %s conn %#llx\n", i,
107                        tgt->ltd_uuid.uuid, tgt->ltd_exp->exp_handle.h_cookie);
108
109                 if (obd_uuid_equals(uuid, &tgt->ltd_uuid))
110                         break;
111         }
112
113         if (i == lmv->desc.ld_tgt_count)
114                 GOTO(out_lmv_lock, rc = -EINVAL);
115
116         obd = class_exp2obd(tgt->ltd_exp);
117         if (obd == NULL)
118                 GOTO(out_lmv_lock, rc = -ENOTCONN);
119
120         CDEBUG(D_INFO, "Found OBD %s=%s device %d (%p) type %s at LMV idx %d\n",
121                obd->obd_name, obd->obd_uuid.uuid, obd->obd_minor, obd,
122                obd->obd_type->typ_name, i);
123         LASSERT(strcmp(obd->obd_type->typ_name, LUSTRE_MDC_NAME) == 0);
124
125         if (tgt->ltd_active == activate) {
126                 CDEBUG(D_INFO, "OBD %p already %sactive!\n", obd,
127                        activate ? "" : "in");
128                 GOTO(out_lmv_lock, rc);
129         }
130
131         CDEBUG(D_INFO, "Marking OBD %p %sactive\n", obd,
132                activate ? "" : "in");
133         lmv_activate_target(lmv, tgt, activate);
134         EXIT;
135
136  out_lmv_lock:
137         spin_unlock(&lmv->lmv_lock);
138         return rc;
139 }
140
141 struct obd_uuid *lmv_get_uuid(struct obd_export *exp)
142 {
143         struct lmv_obd          *lmv = &exp->exp_obd->u.lmv;
144         struct lmv_tgt_desc     *tgt = lmv->tgts[0];
145
146         return (tgt == NULL) ? NULL : obd_get_uuid(tgt->ltd_exp);
147 }
148
149 static int lmv_notify(struct obd_device *obd, struct obd_device *watched,
150                       enum obd_notify_event ev, void *data)
151 {
152         struct obd_connect_data *conn_data;
153         struct lmv_obd          *lmv = &obd->u.lmv;
154         struct obd_uuid         *uuid;
155         int                      rc = 0;
156         ENTRY;
157
158         if (strcmp(watched->obd_type->typ_name, LUSTRE_MDC_NAME)) {
159                 CERROR("unexpected notification of %s %s!\n",
160                        watched->obd_type->typ_name,
161                        watched->obd_name);
162                 RETURN(-EINVAL);
163         }
164
165         uuid = &watched->u.cli.cl_target_uuid;
166         if (ev == OBD_NOTIFY_ACTIVE || ev == OBD_NOTIFY_INACTIVE) {
167                 /*
168                  * Set MDC as active before notifying the observer, so the
169                  * observer can use the MDC normally.
170                  */
171                 rc = lmv_set_mdc_active(lmv, uuid,
172                                         ev == OBD_NOTIFY_ACTIVE);
173                 if (rc) {
174                         CERROR("%sactivation of %s failed: %d\n",
175                                ev == OBD_NOTIFY_ACTIVE ? "" : "de",
176                                uuid->uuid, rc);
177                         RETURN(rc);
178                 }
179         } else if (ev == OBD_NOTIFY_OCD) {
180                 conn_data = &watched->u.cli.cl_import->imp_connect_data;
181                 /*
182                  * XXX: Make sure that ocd_connect_flags from all targets are
183                  * the same. Otherwise one of MDTs runs wrong version or
184                  * something like this.  --umka
185                  */
186                 obd->obd_self_export->exp_connect_data = *conn_data;
187         }
188
189         /*
190          * Pass the notification up the chain.
191          */
192         if (obd->obd_observer)
193                 rc = obd_notify(obd->obd_observer, watched, ev, data);
194
195         RETURN(rc);
196 }
197
198 static int lmv_connect(const struct lu_env *env,
199                        struct obd_export **pexp, struct obd_device *obd,
200                        struct obd_uuid *cluuid, struct obd_connect_data *data,
201                        void *localdata)
202 {
203         struct lmv_obd *lmv = &obd->u.lmv;
204         struct lustre_handle conn = { 0 };
205         struct obd_export *exp;
206         int rc;
207         ENTRY;
208
209         rc = class_connect(&conn, obd, cluuid);
210         if (rc) {
211                 CERROR("class_connection() returned %d\n", rc);
212                 RETURN(rc);
213         }
214
215         exp = class_conn2export(&conn);
216
217         lmv->connected = 0;
218         lmv->cluuid = *cluuid;
219         lmv->conn_data = *data;
220
221         if (lmv->targets_proc_entry == NULL) {
222                 lmv->targets_proc_entry = lprocfs_register("target_obds",
223                                                            obd->obd_proc_entry,
224                                                            NULL, NULL);
225                 if (IS_ERR(lmv->targets_proc_entry)) {
226                         CERROR("%s: cannot register "
227                                "/proc/fs/lustre/%s/%s/target_obds\n",
228                                obd->obd_name, obd->obd_type->typ_name,
229                                obd->obd_name);
230                         lmv->targets_proc_entry = NULL;
231                 }
232         }
233
234         rc = lmv_check_connect(obd);
235         if (rc != 0)
236                 GOTO(out_proc, rc);
237
238         *pexp = exp;
239
240         RETURN(rc);
241
242 out_proc:
243         if (lmv->targets_proc_entry != NULL)
244                 lprocfs_remove(&lmv->targets_proc_entry);
245
246         class_disconnect(exp);
247
248         return rc;
249 }
250
251 static int lmv_init_ea_size(struct obd_export *exp, __u32 easize,
252                             __u32 def_easize)
253 {
254         struct obd_device       *obd = exp->exp_obd;
255         struct lmv_obd          *lmv = &obd->u.lmv;
256         __u32                    i;
257         int                      rc = 0;
258         int                      change = 0;
259         ENTRY;
260
261         if (lmv->max_easize < easize) {
262                 lmv->max_easize = easize;
263                 change = 1;
264         }
265         if (lmv->max_def_easize < def_easize) {
266                 lmv->max_def_easize = def_easize;
267                 change = 1;
268         }
269
270         if (change == 0)
271                 RETURN(0);
272
273         if (lmv->connected == 0)
274                 RETURN(0);
275
276         for (i = 0; i < lmv->desc.ld_tgt_count; i++) {
277                 struct lmv_tgt_desc *tgt = lmv->tgts[i];
278
279                 if (tgt == NULL || tgt->ltd_exp == NULL || !tgt->ltd_active) {
280                         CWARN("%s: NULL export for %d\n", obd->obd_name, i);
281                         continue;
282                 }
283
284                 rc = md_init_ea_size(tgt->ltd_exp, easize, def_easize);
285                 if (rc) {
286                         CERROR("%s: obd_init_ea_size() failed on MDT target %d:"
287                                " rc = %d\n", obd->obd_name, i, rc);
288                         break;
289                 }
290         }
291         RETURN(rc);
292 }
293
294 #define MAX_STRING_SIZE 128
295
296 int lmv_connect_mdc(struct obd_device *obd, struct lmv_tgt_desc *tgt)
297 {
298         struct lmv_obd          *lmv = &obd->u.lmv;
299         struct obd_uuid         *cluuid = &lmv->cluuid;
300         struct obd_uuid          lmv_mdc_uuid = { "LMV_MDC_UUID" };
301         struct obd_device       *mdc_obd;
302         struct obd_export       *mdc_exp;
303         struct lu_fld_target     target;
304         int                      rc;
305         ENTRY;
306
307         mdc_obd = class_find_client_obd(&tgt->ltd_uuid, LUSTRE_MDC_NAME,
308                                         &obd->obd_uuid);
309         if (!mdc_obd) {
310                 CERROR("target %s not attached\n", tgt->ltd_uuid.uuid);
311                 RETURN(-EINVAL);
312         }
313
314         CDEBUG(D_CONFIG, "connect to %s(%s) - %s, %s FOR %s\n",
315                 mdc_obd->obd_name, mdc_obd->obd_uuid.uuid,
316                 tgt->ltd_uuid.uuid, obd->obd_uuid.uuid,
317                 cluuid->uuid);
318
319         if (!mdc_obd->obd_set_up) {
320                 CERROR("target %s is not set up\n", tgt->ltd_uuid.uuid);
321                 RETURN(-EINVAL);
322         }
323
324         rc = obd_connect(NULL, &mdc_exp, mdc_obd, &lmv_mdc_uuid,
325                          &lmv->conn_data, NULL);
326         if (rc) {
327                 CERROR("target %s connect error %d\n", tgt->ltd_uuid.uuid, rc);
328                 RETURN(rc);
329         }
330
331         /*
332          * Init fid sequence client for this mdc and add new fld target.
333          */
334         rc = obd_fid_init(mdc_obd, mdc_exp, LUSTRE_SEQ_METADATA);
335         if (rc)
336                 RETURN(rc);
337
338         target.ft_srv = NULL;
339         target.ft_exp = mdc_exp;
340         target.ft_idx = tgt->ltd_idx;
341
342         fld_client_add_target(&lmv->lmv_fld, &target);
343
344         rc = obd_register_observer(mdc_obd, obd);
345         if (rc) {
346                 obd_disconnect(mdc_exp);
347                 CERROR("target %s register_observer error %d\n",
348                        tgt->ltd_uuid.uuid, rc);
349                 RETURN(rc);
350         }
351
352         if (obd->obd_observer) {
353                 /*
354                  * Tell the observer about the new target.
355                  */
356                 rc = obd_notify(obd->obd_observer, mdc_exp->exp_obd,
357                                 OBD_NOTIFY_ACTIVE,
358                                 (void *)(tgt - lmv->tgts[0]));
359                 if (rc) {
360                         obd_disconnect(mdc_exp);
361                         RETURN(rc);
362                 }
363         }
364
365         tgt->ltd_active = 1;
366         tgt->ltd_exp = mdc_exp;
367         lmv->desc.ld_active_tgt_count++;
368
369         md_init_ea_size(tgt->ltd_exp, lmv->max_easize, lmv->max_def_easize);
370
371         CDEBUG(D_CONFIG, "Connected to %s(%s) successfully (%d)\n",
372                 mdc_obd->obd_name, mdc_obd->obd_uuid.uuid,
373                 atomic_read(&obd->obd_refcount));
374
375         if (lmv->targets_proc_entry != NULL) {
376                 struct proc_dir_entry *mdc_symlink;
377
378                 LASSERT(mdc_obd->obd_type != NULL);
379                 LASSERT(mdc_obd->obd_type->typ_name != NULL);
380                 mdc_symlink = lprocfs_add_symlink(mdc_obd->obd_name,
381                                                   lmv->targets_proc_entry,
382                                                   "../../../%s/%s",
383                                                   mdc_obd->obd_type->typ_name,
384                                                   mdc_obd->obd_name);
385                 if (mdc_symlink == NULL) {
386                         CERROR("cannot register LMV target "
387                                "/proc/fs/lustre/%s/%s/target_obds/%s\n",
388                                obd->obd_type->typ_name, obd->obd_name,
389                                mdc_obd->obd_name);
390                 }
391         }
392         RETURN(0);
393 }
394
395 static void lmv_del_target(struct lmv_obd *lmv, int index)
396 {
397         if (lmv->tgts[index] == NULL)
398                 return;
399
400         OBD_FREE_PTR(lmv->tgts[index]);
401         lmv->tgts[index] = NULL;
402         return;
403 }
404
405 static int lmv_add_target(struct obd_device *obd, struct obd_uuid *uuidp,
406                            __u32 index, int gen)
407 {
408         struct obd_device *mdc_obd;
409         struct lmv_obd      *lmv = &obd->u.lmv;
410         struct lmv_tgt_desc *tgt;
411         int                  orig_tgt_count = 0;
412         int                  rc = 0;
413         ENTRY;
414
415         CDEBUG(D_CONFIG, "Target uuid: %s. index %d\n", uuidp->uuid, index);
416         mdc_obd = class_find_client_obd(uuidp, LUSTRE_MDC_NAME,
417                                         &obd->obd_uuid);
418         if (!mdc_obd) {
419                 CERROR("%s: Target %s not attached: rc = %d\n",
420                        obd->obd_name, uuidp->uuid, -EINVAL);
421                 RETURN(-EINVAL);
422         }
423
424         mutex_lock(&lmv->lmv_init_mutex);
425         if ((index < lmv->tgts_size) && (lmv->tgts[index] != NULL)) {
426                 tgt = lmv->tgts[index];
427                 CERROR("%s: UUID %s already assigned at LOV target index %d:"
428                        " rc = %d\n", obd->obd_name,
429                        obd_uuid2str(&tgt->ltd_uuid), index, -EEXIST);
430                 mutex_unlock(&lmv->lmv_init_mutex);
431                 RETURN(-EEXIST);
432         }
433
434         if (index >= lmv->tgts_size) {
435                 /* We need to reallocate the lmv target array. */
436                 struct lmv_tgt_desc **newtgts, **old = NULL;
437                 __u32 newsize = 1;
438                 __u32 oldsize = 0;
439
440                 while (newsize < index + 1)
441                         newsize = newsize << 1;
442                 OBD_ALLOC(newtgts, sizeof(*newtgts) * newsize);
443                 if (newtgts == NULL) {
444                         mutex_unlock(&lmv->lmv_init_mutex);
445                         RETURN(-ENOMEM);
446                 }
447
448                 if (lmv->tgts_size) {
449                         memcpy(newtgts, lmv->tgts,
450                                sizeof(*newtgts) * lmv->tgts_size);
451                         old = lmv->tgts;
452                         oldsize = lmv->tgts_size;
453                 }
454
455                 lmv->tgts = newtgts;
456                 lmv->tgts_size = newsize;
457                 smp_rmb();
458                 if (old)
459                         OBD_FREE(old, sizeof(*old) * oldsize);
460
461                 CDEBUG(D_CONFIG, "tgts: %p size: %d\n", lmv->tgts,
462                        lmv->tgts_size);
463         }
464
465         OBD_ALLOC_PTR(tgt);
466         if (!tgt) {
467                 mutex_unlock(&lmv->lmv_init_mutex);
468                 RETURN(-ENOMEM);
469         }
470
471         mutex_init(&tgt->ltd_fid_mutex);
472         tgt->ltd_idx = index;
473         tgt->ltd_uuid = *uuidp;
474         tgt->ltd_active = 0;
475         lmv->tgts[index] = tgt;
476         if (index >= lmv->desc.ld_tgt_count) {
477                 orig_tgt_count = lmv->desc.ld_tgt_count;
478                 lmv->desc.ld_tgt_count = index + 1;
479         }
480
481         if (lmv->connected == 0) {
482                 /* lmv_check_connect() will connect this target. */
483                 mutex_unlock(&lmv->lmv_init_mutex);
484                 RETURN(0);
485         }
486
487         /* Otherwise let's connect it ourselves */
488         mutex_unlock(&lmv->lmv_init_mutex);
489         rc = lmv_connect_mdc(obd, tgt);
490         if (rc != 0) {
491                 spin_lock(&lmv->lmv_lock);
492                 if (lmv->desc.ld_tgt_count == index + 1)
493                         lmv->desc.ld_tgt_count = orig_tgt_count;
494                 memset(tgt, 0, sizeof(*tgt));
495                 spin_unlock(&lmv->lmv_lock);
496         } else {
497                 int easize = sizeof(struct lmv_stripe_md) +
498                         lmv->desc.ld_tgt_count * sizeof(struct lu_fid);
499                 lmv_init_ea_size(obd->obd_self_export, easize, 0);
500         }
501
502         RETURN(rc);
503 }
504
505 static int lmv_check_connect(struct obd_device *obd)
506 {
507         struct lmv_obd          *lmv = &obd->u.lmv;
508         struct lmv_tgt_desc     *tgt;
509         __u32                    i;
510         int                      rc;
511         int                      easize;
512         ENTRY;
513
514         if (lmv->connected)
515                 RETURN(0);
516
517         mutex_lock(&lmv->lmv_init_mutex);
518         if (lmv->connected) {
519                 mutex_unlock(&lmv->lmv_init_mutex);
520                 RETURN(0);
521         }
522
523         if (lmv->desc.ld_tgt_count == 0) {
524                 mutex_unlock(&lmv->lmv_init_mutex);
525                 CERROR("%s: no targets configured.\n", obd->obd_name);
526                 RETURN(-EINVAL);
527         }
528
529         LASSERT(lmv->tgts != NULL);
530
531         if (lmv->tgts[0] == NULL) {
532                 mutex_unlock(&lmv->lmv_init_mutex);
533                 CERROR("%s: no target configured for index 0.\n",
534                        obd->obd_name);
535                 RETURN(-EINVAL);
536         }
537
538         CDEBUG(D_CONFIG, "Time to connect %s to %s\n",
539                lmv->cluuid.uuid, obd->obd_name);
540
541         for (i = 0; i < lmv->desc.ld_tgt_count; i++) {
542                 tgt = lmv->tgts[i];
543                 if (tgt == NULL)
544                         continue;
545                 rc = lmv_connect_mdc(obd, tgt);
546                 if (rc)
547                         GOTO(out_disc, rc);
548         }
549
550         lmv->connected = 1;
551         easize = lmv_mds_md_size(lmv->desc.ld_tgt_count, LMV_MAGIC);
552         lmv_init_ea_size(obd->obd_self_export, easize, 0);
553         mutex_unlock(&lmv->lmv_init_mutex);
554         RETURN(0);
555
556  out_disc:
557         while (i-- > 0) {
558                 int rc2;
559                 tgt = lmv->tgts[i];
560                 if (tgt == NULL)
561                         continue;
562                 tgt->ltd_active = 0;
563                 if (tgt->ltd_exp) {
564                         --lmv->desc.ld_active_tgt_count;
565                         rc2 = obd_disconnect(tgt->ltd_exp);
566                         if (rc2) {
567                                 CERROR("LMV target %s disconnect on "
568                                        "MDC idx %d: error %d\n",
569                                        tgt->ltd_uuid.uuid, i, rc2);
570                         }
571                 }
572         }
573
574         mutex_unlock(&lmv->lmv_init_mutex);
575
576         RETURN(rc);
577 }
578
579 static int lmv_disconnect_mdc(struct obd_device *obd, struct lmv_tgt_desc *tgt)
580 {
581         struct lmv_obd         *lmv = &obd->u.lmv;
582         struct obd_device      *mdc_obd;
583         int                     rc;
584         ENTRY;
585
586         LASSERT(tgt != NULL);
587         LASSERT(obd != NULL);
588
589         mdc_obd = class_exp2obd(tgt->ltd_exp);
590
591         if (mdc_obd) {
592                 mdc_obd->obd_force = obd->obd_force;
593                 mdc_obd->obd_fail = obd->obd_fail;
594                 mdc_obd->obd_no_recov = obd->obd_no_recov;
595
596                 if (lmv->targets_proc_entry != NULL)
597                         lprocfs_remove_proc_entry(mdc_obd->obd_name,
598                                                   lmv->targets_proc_entry);
599         }
600
601         rc = obd_fid_fini(tgt->ltd_exp->exp_obd);
602         if (rc)
603                 CERROR("Can't finanize fids factory\n");
604
605         CDEBUG(D_INFO, "Disconnected from %s(%s) successfully\n",
606                tgt->ltd_exp->exp_obd->obd_name,
607                tgt->ltd_exp->exp_obd->obd_uuid.uuid);
608
609         obd_register_observer(tgt->ltd_exp->exp_obd, NULL);
610         rc = obd_disconnect(tgt->ltd_exp);
611         if (rc) {
612                 if (tgt->ltd_active) {
613                         CERROR("Target %s disconnect error %d\n",
614                                tgt->ltd_uuid.uuid, rc);
615                 }
616         }
617
618         lmv_activate_target(lmv, tgt, 0);
619         tgt->ltd_exp = NULL;
620         RETURN(0);
621 }
622
623 static int lmv_disconnect(struct obd_export *exp)
624 {
625         struct obd_device       *obd = class_exp2obd(exp);
626         struct lmv_obd          *lmv = &obd->u.lmv;
627         int                      rc;
628         __u32                    i;
629         ENTRY;
630
631         if (!lmv->tgts)
632                 goto out_local;
633
634         for (i = 0; i < lmv->desc.ld_tgt_count; i++) {
635                 if (lmv->tgts[i] == NULL || lmv->tgts[i]->ltd_exp == NULL)
636                         continue;
637
638                 lmv_disconnect_mdc(obd, lmv->tgts[i]);
639         }
640
641         if (lmv->targets_proc_entry != NULL)
642                 lprocfs_remove(&lmv->targets_proc_entry);
643         else
644                 CERROR("/proc/fs/lustre/%s/%s/target_obds missing\n",
645                        obd->obd_type->typ_name, obd->obd_name);
646
647 out_local:
648         /*
649          * This is the case when no real connection is established by
650          * lmv_check_connect().
651          */
652         if (!lmv->connected)
653                 class_export_put(exp);
654         rc = class_disconnect(exp);
655         lmv->connected = 0;
656
657         RETURN(rc);
658 }
659
660 static int lmv_fid2path(struct obd_export *exp, int len, void *karg,
661                         void __user *uarg)
662 {
663         struct obd_device       *obddev = class_exp2obd(exp);
664         struct lmv_obd          *lmv = &obddev->u.lmv;
665         struct getinfo_fid2path *gf;
666         struct lmv_tgt_desc     *tgt;
667         struct getinfo_fid2path *remote_gf = NULL;
668         struct lu_fid           root_fid;
669         int                     remote_gf_size = 0;
670         int                     rc;
671
672         gf = karg;
673         tgt = lmv_find_target(lmv, &gf->gf_fid);
674         if (IS_ERR(tgt))
675                 RETURN(PTR_ERR(tgt));
676
677         root_fid = *gf->gf_u.gf_root_fid;
678         LASSERT(fid_is_sane(&root_fid));
679
680 repeat_fid2path:
681         rc = obd_iocontrol(OBD_IOC_FID2PATH, tgt->ltd_exp, len, gf, uarg);
682         if (rc != 0 && rc != -EREMOTE)
683                 GOTO(out_fid2path, rc);
684
685         /* If remote_gf != NULL, it means just building the
686          * path on the remote MDT, copy this path segement to gf */
687         if (remote_gf != NULL) {
688                 struct getinfo_fid2path *ori_gf;
689                 char *ptr;
690
691                 ori_gf = (struct getinfo_fid2path *)karg;
692                 if (strlen(ori_gf->gf_u.gf_path) +
693                     strlen(gf->gf_u.gf_path) > ori_gf->gf_pathlen)
694                         GOTO(out_fid2path, rc = -EOVERFLOW);
695
696                 ptr = ori_gf->gf_u.gf_path;
697
698                 memmove(ptr + strlen(gf->gf_u.gf_path) + 1, ptr,
699                         strlen(ori_gf->gf_u.gf_path));
700
701                 strncpy(ptr, gf->gf_u.gf_path,
702                         strlen(gf->gf_u.gf_path));
703                 ptr += strlen(gf->gf_u.gf_path);
704                 *ptr = '/';
705         }
706
707         CDEBUG(D_INFO, "%s: get path %s "DFID" rec: %llu ln: %u\n",
708                tgt->ltd_exp->exp_obd->obd_name,
709                gf->gf_u.gf_path, PFID(&gf->gf_fid), gf->gf_recno,
710                gf->gf_linkno);
711
712         if (rc == 0)
713                 GOTO(out_fid2path, rc);
714
715         /* sigh, has to go to another MDT to do path building further */
716         if (remote_gf == NULL) {
717                 remote_gf_size = sizeof(*remote_gf) + PATH_MAX;
718                 OBD_ALLOC(remote_gf, remote_gf_size);
719                 if (remote_gf == NULL)
720                         GOTO(out_fid2path, rc = -ENOMEM);
721                 remote_gf->gf_pathlen = PATH_MAX;
722         }
723
724         if (!fid_is_sane(&gf->gf_fid)) {
725                 CERROR("%s: invalid FID "DFID": rc = %d\n",
726                        tgt->ltd_exp->exp_obd->obd_name,
727                        PFID(&gf->gf_fid), -EINVAL);
728                 GOTO(out_fid2path, rc = -EINVAL);
729         }
730
731         tgt = lmv_find_target(lmv, &gf->gf_fid);
732         if (IS_ERR(tgt))
733                 GOTO(out_fid2path, rc = -EINVAL);
734
735         remote_gf->gf_fid = gf->gf_fid;
736         remote_gf->gf_recno = -1;
737         remote_gf->gf_linkno = -1;
738         memset(remote_gf->gf_u.gf_path, 0, remote_gf->gf_pathlen);
739         *remote_gf->gf_u.gf_root_fid = root_fid;
740         gf = remote_gf;
741         goto repeat_fid2path;
742
743 out_fid2path:
744         if (remote_gf != NULL)
745                 OBD_FREE(remote_gf, remote_gf_size);
746         RETURN(rc);
747 }
748
749 static int lmv_hsm_req_count(struct lmv_obd *lmv,
750                              const struct hsm_user_request *hur,
751                              const struct lmv_tgt_desc *tgt_mds)
752 {
753         __u32                    i;
754         int                      nr = 0;
755         struct lmv_tgt_desc     *curr_tgt;
756
757         /* count how many requests must be sent to the given target */
758         for (i = 0; i < hur->hur_request.hr_itemcount; i++) {
759                 curr_tgt = lmv_find_target(lmv, &hur->hur_user_item[i].hui_fid);
760                 if (IS_ERR(curr_tgt))
761                         RETURN(PTR_ERR(curr_tgt));
762                 if (obd_uuid_equals(&curr_tgt->ltd_uuid, &tgt_mds->ltd_uuid))
763                         nr++;
764         }
765         return nr;
766 }
767
768 static int lmv_hsm_req_build(struct lmv_obd *lmv,
769                               struct hsm_user_request *hur_in,
770                               const struct lmv_tgt_desc *tgt_mds,
771                               struct hsm_user_request *hur_out)
772 {
773         __u32                    i, nr_out;
774         struct lmv_tgt_desc     *curr_tgt;
775
776         /* build the hsm_user_request for the given target */
777         hur_out->hur_request = hur_in->hur_request;
778         nr_out = 0;
779         for (i = 0; i < hur_in->hur_request.hr_itemcount; i++) {
780                 curr_tgt = lmv_find_target(lmv,
781                                            &hur_in->hur_user_item[i].hui_fid);
782                 if (IS_ERR(curr_tgt))
783                         RETURN(PTR_ERR(curr_tgt));
784                 if (obd_uuid_equals(&curr_tgt->ltd_uuid, &tgt_mds->ltd_uuid)) {
785                         hur_out->hur_user_item[nr_out] =
786                                                 hur_in->hur_user_item[i];
787                         nr_out++;
788                 }
789         }
790         hur_out->hur_request.hr_itemcount = nr_out;
791         memcpy(hur_data(hur_out), hur_data(hur_in),
792                hur_in->hur_request.hr_data_len);
793
794         RETURN(0);
795 }
796
797 static int lmv_hsm_ct_unregister(struct lmv_obd *lmv, unsigned int cmd, int len,
798                                  struct lustre_kernelcomm *lk,
799                                  void __user *uarg)
800 {
801         __u32   i;
802         int     rc;
803         ENTRY;
804
805         /* unregister request (call from llapi_hsm_copytool_fini) */
806         for (i = 0; i < lmv->desc.ld_tgt_count; i++) {
807                 struct lmv_tgt_desc *tgt = lmv->tgts[i];
808
809                 if (tgt == NULL || tgt->ltd_exp == NULL)
810                         continue;
811                 /* best effort: try to clean as much as possible
812                  * (continue on error) */
813                 obd_iocontrol(cmd, tgt->ltd_exp, len, lk, uarg);
814         }
815
816         /* Whatever the result, remove copytool from kuc groups.
817          * Unreached coordinators will get EPIPE on next requests
818          * and will unregister automatically.
819          */
820         rc = libcfs_kkuc_group_rem(lk->lk_uid, lk->lk_group);
821
822         RETURN(rc);
823 }
824
825 static int lmv_hsm_ct_register(struct lmv_obd *lmv, unsigned int cmd, int len,
826                                struct lustre_kernelcomm *lk, __user void *uarg)
827 {
828         struct file             *filp;
829         __u32                    i, j;
830         int                      err, rc;
831         bool                     any_set = false;
832         struct kkuc_ct_data      kcd = { 0 };
833         ENTRY;
834
835         /* All or nothing: try to register to all MDS.
836          * In case of failure, unregister from previous MDS,
837          * except if it because of inactive target. */
838         for (i = 0; i < lmv->desc.ld_tgt_count; i++) {
839                 struct lmv_tgt_desc *tgt = lmv->tgts[i];
840
841                 if (tgt == NULL || tgt->ltd_exp == NULL)
842                         continue;
843                 err = obd_iocontrol(cmd, tgt->ltd_exp, len, lk, uarg);
844                 if (err) {
845                         if (tgt->ltd_active) {
846                                 /* permanent error */
847                                 CERROR("%s: iocontrol MDC %s on MDT"
848                                        " idx %d cmd %x: err = %d\n",
849                                        lmv2obd_dev(lmv)->obd_name,
850                                        tgt->ltd_uuid.uuid, i, cmd, err);
851                                 rc = err;
852                                 lk->lk_flags |= LK_FLG_STOP;
853                                 /* unregister from previous MDS */
854                                 for (j = 0; j < i; j++) {
855                                         tgt = lmv->tgts[j];
856                                         if (tgt == NULL || tgt->ltd_exp == NULL)
857                                                 continue;
858                                         obd_iocontrol(cmd, tgt->ltd_exp, len,
859                                                       lk, uarg);
860                                 }
861                                 RETURN(rc);
862                         }
863                         /* else: transient error.
864                          * kuc will register to the missing MDT
865                          * when it is back */
866                 } else {
867                         any_set = true;
868                 }
869         }
870
871         if (!any_set)
872                 /* no registration done: return error */
873                 RETURN(-ENOTCONN);
874
875         /* at least one registration done, with no failure */
876         filp = fget(lk->lk_wfd);
877         if (filp == NULL)
878                 RETURN(-EBADF);
879
880         kcd.kcd_magic = KKUC_CT_DATA_MAGIC;
881         kcd.kcd_uuid = lmv->cluuid;
882         kcd.kcd_archive = lk->lk_data;
883
884         rc = libcfs_kkuc_group_add(filp, lk->lk_uid, lk->lk_group,
885                                    &kcd, sizeof(kcd));
886         if (rc != 0)
887                 fput(filp);
888
889         RETURN(rc);
890 }
891
892
893
894
895 static int lmv_iocontrol(unsigned int cmd, struct obd_export *exp,
896                          int len, void *karg, void __user *uarg)
897 {
898         struct obd_device       *obddev = class_exp2obd(exp);
899         struct lmv_obd          *lmv = &obddev->u.lmv;
900         struct lmv_tgt_desc     *tgt = NULL;
901         __u32                    i = 0;
902         int                      rc = 0;
903         int                      set = 0;
904         __u32                    count = lmv->desc.ld_tgt_count;
905         ENTRY;
906
907         if (count == 0)
908                 RETURN(-ENOTTY);
909
910         switch (cmd) {
911         case IOC_OBD_STATFS: {
912                 struct obd_ioctl_data *data = karg;
913                 struct obd_device *mdc_obd;
914                 struct obd_statfs stat_buf = {0};
915                 __u32 index;
916
917                 memcpy(&index, data->ioc_inlbuf2, sizeof(__u32));
918                 if ((index >= count))
919                         RETURN(-ENODEV);
920
921                 tgt = lmv->tgts[index];
922                 if (tgt == NULL || !tgt->ltd_active)
923                         RETURN(-ENODATA);
924
925                 mdc_obd = class_exp2obd(tgt->ltd_exp);
926                 if (!mdc_obd)
927                         RETURN(-EINVAL);
928
929                 /* copy UUID */
930                 if (copy_to_user(data->ioc_pbuf2, obd2cli_tgt(mdc_obd),
931                                  min((int) data->ioc_plen2,
932                                      (int) sizeof(struct obd_uuid))))
933                         RETURN(-EFAULT);
934
935                 rc = obd_statfs(NULL, tgt->ltd_exp, &stat_buf,
936                                 cfs_time_shift_64(-OBD_STATFS_CACHE_SECONDS),
937                                 0);
938                 if (rc)
939                         RETURN(rc);
940                 if (copy_to_user(data->ioc_pbuf1, &stat_buf,
941                                  min((int) data->ioc_plen1,
942                                      (int) sizeof(stat_buf))))
943                         RETURN(-EFAULT);
944                 break;
945         }
946         case OBD_IOC_QUOTACTL: {
947                 struct if_quotactl *qctl = karg;
948                 struct obd_quotactl *oqctl;
949
950                 if (qctl->qc_valid == QC_MDTIDX) {
951                         if (count <= qctl->qc_idx)
952                                 RETURN(-EINVAL);
953
954                         tgt = lmv->tgts[qctl->qc_idx];
955                         if (tgt == NULL || tgt->ltd_exp == NULL)
956                                 RETURN(-EINVAL);
957                 } else if (qctl->qc_valid == QC_UUID) {
958                         for (i = 0; i < count; i++) {
959                                 tgt = lmv->tgts[i];
960                                 if (tgt == NULL)
961                                         continue;
962                                 if (!obd_uuid_equals(&tgt->ltd_uuid,
963                                                      &qctl->obd_uuid))
964                                         continue;
965
966                                 if (tgt->ltd_exp == NULL)
967                                         RETURN(-EINVAL);
968
969                                 break;
970                         }
971                 } else {
972                         RETURN(-EINVAL);
973                 }
974
975                 if (i >= count)
976                         RETURN(-EAGAIN);
977
978                 LASSERT(tgt != NULL && tgt->ltd_exp != NULL);
979                 OBD_ALLOC_PTR(oqctl);
980                 if (!oqctl)
981                         RETURN(-ENOMEM);
982
983                 QCTL_COPY(oqctl, qctl);
984                 rc = obd_quotactl(tgt->ltd_exp, oqctl);
985                 if (rc == 0) {
986                         QCTL_COPY(qctl, oqctl);
987                         qctl->qc_valid = QC_MDTIDX;
988                         qctl->obd_uuid = tgt->ltd_uuid;
989                 }
990                 OBD_FREE_PTR(oqctl);
991                 break;
992         }
993         case OBD_IOC_CHANGELOG_SEND:
994         case OBD_IOC_CHANGELOG_CLEAR: {
995                 struct ioc_changelog *icc = karg;
996
997                 if (icc->icc_mdtindex >= count)
998                         RETURN(-ENODEV);
999
1000                 tgt = lmv->tgts[icc->icc_mdtindex];
1001                 if (tgt == NULL || tgt->ltd_exp == NULL || !tgt->ltd_active)
1002                         RETURN(-ENODEV);
1003                 rc = obd_iocontrol(cmd, tgt->ltd_exp, sizeof(*icc), icc, NULL);
1004                 break;
1005         }
1006         case LL_IOC_GET_CONNECT_FLAGS: {
1007                 tgt = lmv->tgts[0];
1008                 if (tgt == NULL || tgt->ltd_exp == NULL)
1009                         RETURN(-ENODATA);
1010                 rc = obd_iocontrol(cmd, tgt->ltd_exp, len, karg, uarg);
1011                 break;
1012         }
1013         case LL_IOC_FID2MDTIDX: {
1014                 struct lu_fid *fid = karg;
1015                 int             mdt_index;
1016
1017                 rc = lmv_fld_lookup(lmv, fid, &mdt_index);
1018                 if (rc != 0)
1019                         RETURN(rc);
1020
1021                 /* Note: this is from llite(see ll_dir_ioctl()), @uarg does not
1022                  * point to user space memory for FID2MDTIDX. */
1023                 *(__u32 *)uarg = mdt_index;
1024                 break;
1025         }
1026         case OBD_IOC_FID2PATH: {
1027                 rc = lmv_fid2path(exp, len, karg, uarg);
1028                 break;
1029         }
1030         case LL_IOC_HSM_STATE_GET:
1031         case LL_IOC_HSM_STATE_SET:
1032         case LL_IOC_HSM_ACTION: {
1033                 struct md_op_data       *op_data = karg;
1034
1035                 tgt = lmv_find_target(lmv, &op_data->op_fid1);
1036                 if (IS_ERR(tgt))
1037                         RETURN(PTR_ERR(tgt));
1038
1039                 if (tgt->ltd_exp == NULL)
1040                         RETURN(-EINVAL);
1041
1042                 rc = obd_iocontrol(cmd, tgt->ltd_exp, len, karg, uarg);
1043                 break;
1044         }
1045         case LL_IOC_HSM_PROGRESS: {
1046                 const struct hsm_progress_kernel *hpk = karg;
1047
1048                 tgt = lmv_find_target(lmv, &hpk->hpk_fid);
1049                 if (IS_ERR(tgt))
1050                         RETURN(PTR_ERR(tgt));
1051                 rc = obd_iocontrol(cmd, tgt->ltd_exp, len, karg, uarg);
1052                 break;
1053         }
1054         case LL_IOC_HSM_REQUEST: {
1055                 struct hsm_user_request *hur = karg;
1056                 unsigned int reqcount = hur->hur_request.hr_itemcount;
1057
1058                 if (reqcount == 0)
1059                         RETURN(0);
1060
1061                 /* if the request is about a single fid
1062                  * or if there is a single MDS, no need to split
1063                  * the request. */
1064                 if (reqcount == 1 || count == 1) {
1065                         tgt = lmv_find_target(lmv,
1066                                               &hur->hur_user_item[0].hui_fid);
1067                         if (IS_ERR(tgt))
1068                                 RETURN(PTR_ERR(tgt));
1069                         rc = obd_iocontrol(cmd, tgt->ltd_exp, len, karg, uarg);
1070                 } else {
1071                         /* split fid list to their respective MDS */
1072                         for (i = 0; i < count; i++) {
1073                                 int nr, rc1;
1074                                 size_t reqlen;
1075                                 struct hsm_user_request *req;
1076
1077                                 tgt = lmv->tgts[i];
1078                                 if (tgt == NULL || tgt->ltd_exp == NULL)
1079                                         continue;
1080
1081                                 nr = lmv_hsm_req_count(lmv, hur, tgt);
1082                                 if (nr < 0)
1083                                         RETURN(nr);
1084                                 if (nr == 0) /* nothing for this MDS */
1085                                         continue;
1086
1087                                 /* build a request with fids for this MDS */
1088                                 reqlen = offsetof(typeof(*hur),
1089                                                   hur_user_item[nr])
1090                                                 + hur->hur_request.hr_data_len;
1091                                 OBD_ALLOC_LARGE(req, reqlen);
1092                                 if (req == NULL)
1093                                         RETURN(-ENOMEM);
1094                                 rc1 = lmv_hsm_req_build(lmv, hur, tgt, req);
1095                                 if (rc1 < 0)
1096                                         GOTO(hsm_req_err, rc1);
1097                                 rc1 = obd_iocontrol(cmd, tgt->ltd_exp, reqlen,
1098                                                     req, uarg);
1099 hsm_req_err:
1100                                 if (rc1 != 0 && rc == 0)
1101                                         rc = rc1;
1102                                 OBD_FREE_LARGE(req, reqlen);
1103                         }
1104                 }
1105                 break;
1106         }
1107         case LL_IOC_LOV_SWAP_LAYOUTS: {
1108                 struct md_op_data       *op_data = karg;
1109                 struct lmv_tgt_desc     *tgt1, *tgt2;
1110
1111                 tgt1 = lmv_find_target(lmv, &op_data->op_fid1);
1112                 if (IS_ERR(tgt1))
1113                         RETURN(PTR_ERR(tgt1));
1114
1115                 tgt2 = lmv_find_target(lmv, &op_data->op_fid2);
1116                 if (IS_ERR(tgt2))
1117                         RETURN(PTR_ERR(tgt2));
1118
1119                 if ((tgt1->ltd_exp == NULL) || (tgt2->ltd_exp == NULL))
1120                         RETURN(-EINVAL);
1121
1122                 /* only files on same MDT can have their layouts swapped */
1123                 if (tgt1->ltd_idx != tgt2->ltd_idx)
1124                         RETURN(-EPERM);
1125
1126                 rc = obd_iocontrol(cmd, tgt1->ltd_exp, len, karg, uarg);
1127                 break;
1128         }
1129         case LL_IOC_HSM_CT_START: {
1130                 struct lustre_kernelcomm *lk = karg;
1131                 if (lk->lk_flags & LK_FLG_STOP)
1132                         rc = lmv_hsm_ct_unregister(lmv, cmd, len, lk, uarg);
1133                 else
1134                         rc = lmv_hsm_ct_register(lmv, cmd, len, lk, uarg);
1135                 break;
1136         }
1137         default:
1138                 for (i = 0; i < count; i++) {
1139                         struct obd_device *mdc_obd;
1140                         int err;
1141
1142                         tgt = lmv->tgts[i];
1143                         if (tgt == NULL || tgt->ltd_exp == NULL)
1144                                 continue;
1145                         /* ll_umount_begin() sets force flag but for lmv, not
1146                          * mdc. Let's pass it through */
1147                         mdc_obd = class_exp2obd(tgt->ltd_exp);
1148                         mdc_obd->obd_force = obddev->obd_force;
1149                         err = obd_iocontrol(cmd, tgt->ltd_exp, len, karg, uarg);
1150                         if (err) {
1151                                 if (tgt->ltd_active) {
1152                                         CERROR("error: iocontrol MDC %s on MDT"
1153                                                " idx %d cmd %x: err = %d\n",
1154                                                tgt->ltd_uuid.uuid, i, cmd, err);
1155                                         if (!rc)
1156                                                 rc = err;
1157                                 }
1158                         } else
1159                                 set = 1;
1160                 }
1161                 if (!set && !rc)
1162                         rc = -EIO;
1163         }
1164         RETURN(rc);
1165 }
1166
1167 /**
1168  * This is _inode_ placement policy function (not name).
1169  */
1170 static int lmv_placement_policy(struct obd_device *obd,
1171                                 struct md_op_data *op_data, u32 *mds)
1172 {
1173         struct lmv_obd          *lmv = &obd->u.lmv;
1174         ENTRY;
1175
1176         LASSERT(mds != NULL);
1177
1178         if (lmv->desc.ld_tgt_count == 1) {
1179                 *mds = 0;
1180                 RETURN(0);
1181         }
1182
1183         if (op_data->op_default_stripe_offset != -1) {
1184                 *mds = op_data->op_default_stripe_offset;
1185                 RETURN(0);
1186         }
1187
1188         /**
1189          * If stripe_offset is provided during setdirstripe
1190          * (setdirstripe -i xx), xx MDS will be choosen.
1191          */
1192         if (op_data->op_cli_flags & CLI_SET_MEA && op_data->op_data != NULL) {
1193                 struct lmv_user_md *lum;
1194
1195                 lum = op_data->op_data;
1196
1197                 if (le32_to_cpu(lum->lum_stripe_offset) != (__u32)-1) {
1198                         *mds = le32_to_cpu(lum->lum_stripe_offset);
1199                 } else {
1200                         /* -1 means default, which will be in the same MDT with
1201                          * the stripe */
1202                         *mds = op_data->op_mds;
1203                         lum->lum_stripe_offset = cpu_to_le32(op_data->op_mds);
1204                 }
1205         } else {
1206                 /* Allocate new fid on target according to operation type and
1207                  * parent home mds. */
1208                 *mds = op_data->op_mds;
1209         }
1210
1211         RETURN(0);
1212 }
1213
1214 int __lmv_fid_alloc(struct lmv_obd *lmv, struct lu_fid *fid, u32 mds)
1215 {
1216         struct lmv_tgt_desc     *tgt;
1217         int                      rc;
1218         ENTRY;
1219
1220         tgt = lmv_get_target(lmv, mds, NULL);
1221         if (IS_ERR(tgt))
1222                 RETURN(PTR_ERR(tgt));
1223
1224         /*
1225          * New seq alloc and FLD setup should be atomic. Otherwise we may find
1226          * on server that seq in new allocated fid is not yet known.
1227          */
1228         mutex_lock(&tgt->ltd_fid_mutex);
1229
1230         if (tgt->ltd_active == 0 || tgt->ltd_exp == NULL)
1231                 GOTO(out, rc = -ENODEV);
1232
1233         /*
1234          * Asking underlying tgt layer to allocate new fid.
1235          */
1236         rc = obd_fid_alloc(NULL, tgt->ltd_exp, fid, NULL);
1237         if (rc > 0) {
1238                 LASSERT(fid_is_sane(fid));
1239                 rc = 0;
1240         }
1241
1242         EXIT;
1243 out:
1244         mutex_unlock(&tgt->ltd_fid_mutex);
1245         return rc;
1246 }
1247
1248 int lmv_fid_alloc(const struct lu_env *env, struct obd_export *exp,
1249                   struct lu_fid *fid, struct md_op_data *op_data)
1250 {
1251         struct obd_device     *obd = class_exp2obd(exp);
1252         struct lmv_obd        *lmv = &obd->u.lmv;
1253         u32                    mds = 0;
1254         int                    rc;
1255         ENTRY;
1256
1257         LASSERT(op_data != NULL);
1258         LASSERT(fid != NULL);
1259
1260         rc = lmv_placement_policy(obd, op_data, &mds);
1261         if (rc) {
1262                 CERROR("Can't get target for allocating fid, "
1263                        "rc %d\n", rc);
1264                 RETURN(rc);
1265         }
1266
1267         rc = __lmv_fid_alloc(lmv, fid, mds);
1268         if (rc) {
1269                 CERROR("Can't alloc new fid, rc %d\n", rc);
1270                 RETURN(rc);
1271         }
1272
1273         RETURN(rc);
1274 }
1275
1276 static int lmv_setup(struct obd_device *obd, struct lustre_cfg *lcfg)
1277 {
1278         struct lmv_obd  *lmv = &obd->u.lmv;
1279         struct lmv_desc *desc;
1280         int             rc;
1281         ENTRY;
1282
1283         if (LUSTRE_CFG_BUFLEN(lcfg, 1) < 1) {
1284                 CERROR("LMV setup requires a descriptor\n");
1285                 RETURN(-EINVAL);
1286         }
1287
1288         desc = (struct lmv_desc *)lustre_cfg_buf(lcfg, 1);
1289         if (sizeof(*desc) > LUSTRE_CFG_BUFLEN(lcfg, 1)) {
1290                 CERROR("Lmv descriptor size wrong: %d > %d\n",
1291                        (int)sizeof(*desc), LUSTRE_CFG_BUFLEN(lcfg, 1));
1292                 RETURN(-EINVAL);
1293         }
1294
1295         lmv->tgts_size = 32U;
1296         OBD_ALLOC(lmv->tgts, sizeof(*lmv->tgts) * lmv->tgts_size);
1297         if (lmv->tgts == NULL)
1298                 RETURN(-ENOMEM);
1299
1300         obd_str2uuid(&lmv->desc.ld_uuid, desc->ld_uuid.uuid);
1301         lmv->desc.ld_tgt_count = 0;
1302         lmv->desc.ld_active_tgt_count = 0;
1303         lmv->max_def_easize = 0;
1304         lmv->max_easize = 0;
1305
1306         spin_lock_init(&lmv->lmv_lock);
1307         mutex_init(&lmv->lmv_init_mutex);
1308
1309 #ifdef CONFIG_PROC_FS
1310         obd->obd_vars = lprocfs_lmv_obd_vars;
1311         lprocfs_obd_setup(obd);
1312         lprocfs_alloc_md_stats(obd, 0);
1313         rc = lprocfs_seq_create(obd->obd_proc_entry, "target_obd",
1314                                 0444, &lmv_proc_target_fops, obd);
1315         if (rc)
1316                 CWARN("%s: error adding LMV target_obd file: rc = %d\n",
1317                       obd->obd_name, rc);
1318 #endif
1319         rc = fld_client_init(&lmv->lmv_fld, obd->obd_name,
1320                              LUSTRE_CLI_FLD_HASH_DHT);
1321         if (rc) {
1322                 CERROR("Can't init FLD, err %d\n", rc);
1323                 GOTO(out, rc);
1324         }
1325
1326         RETURN(0);
1327
1328 out:
1329         return rc;
1330 }
1331
1332 static int lmv_cleanup(struct obd_device *obd)
1333 {
1334         struct lmv_obd   *lmv = &obd->u.lmv;
1335         ENTRY;
1336
1337         fld_client_fini(&lmv->lmv_fld);
1338         if (lmv->tgts != NULL) {
1339                 int i;
1340                 for (i = 0; i < lmv->desc.ld_tgt_count; i++) {
1341                         if (lmv->tgts[i] == NULL)
1342                                 continue;
1343                         lmv_del_target(lmv, i);
1344                 }
1345                 OBD_FREE(lmv->tgts, sizeof(*lmv->tgts) * lmv->tgts_size);
1346                 lmv->tgts_size = 0;
1347         }
1348         RETURN(0);
1349 }
1350
1351 static int lmv_process_config(struct obd_device *obd, size_t len, void *buf)
1352 {
1353         struct lustre_cfg       *lcfg = buf;
1354         struct obd_uuid         obd_uuid;
1355         int                     gen;
1356         __u32                   index;
1357         int                     rc;
1358         ENTRY;
1359
1360         switch (lcfg->lcfg_command) {
1361         case LCFG_ADD_MDC:
1362                 /* modify_mdc_tgts add 0:lustre-clilmv  1:lustre-MDT0000_UUID
1363                  * 2:0  3:1  4:lustre-MDT0000-mdc_UUID */
1364                 if (LUSTRE_CFG_BUFLEN(lcfg, 1) > sizeof(obd_uuid.uuid))
1365                         GOTO(out, rc = -EINVAL);
1366
1367                 obd_str2uuid(&obd_uuid,  lustre_cfg_buf(lcfg, 1));
1368
1369                 if (sscanf(lustre_cfg_buf(lcfg, 2), "%u", &index) != 1)
1370                         GOTO(out, rc = -EINVAL);
1371                 if (sscanf(lustre_cfg_buf(lcfg, 3), "%d", &gen) != 1)
1372                         GOTO(out, rc = -EINVAL);
1373                 rc = lmv_add_target(obd, &obd_uuid, index, gen);
1374                 GOTO(out, rc);
1375         default:
1376                 CERROR("Unknown command: %d\n", lcfg->lcfg_command);
1377                 GOTO(out, rc = -EINVAL);
1378         }
1379 out:
1380         RETURN(rc);
1381 }
1382
1383 static int lmv_statfs(const struct lu_env *env, struct obd_export *exp,
1384                       struct obd_statfs *osfs, __u64 max_age, __u32 flags)
1385 {
1386         struct obd_device       *obd = class_exp2obd(exp);
1387         struct lmv_obd          *lmv = &obd->u.lmv;
1388         struct obd_statfs       *temp;
1389         int                      rc = 0;
1390         __u32                    i;
1391         ENTRY;
1392
1393         OBD_ALLOC(temp, sizeof(*temp));
1394         if (temp == NULL)
1395                 RETURN(-ENOMEM);
1396
1397         for (i = 0; i < lmv->desc.ld_tgt_count; i++) {
1398                 if (lmv->tgts[i] == NULL || lmv->tgts[i]->ltd_exp == NULL)
1399                         continue;
1400
1401                 rc = obd_statfs(env, lmv->tgts[i]->ltd_exp, temp,
1402                                 max_age, flags);
1403                 if (rc) {
1404                         CERROR("can't stat MDS #%d (%s), error %d\n", i,
1405                                lmv->tgts[i]->ltd_exp->exp_obd->obd_name,
1406                                rc);
1407                         GOTO(out_free_temp, rc);
1408                 }
1409
1410                 if (i == 0) {
1411                         *osfs = *temp;
1412                         /* If the statfs is from mount, it will needs
1413                          * retrieve necessary information from MDT0.
1414                          * i.e. mount does not need the merged osfs
1415                          * from all of MDT.
1416                          * And also clients can be mounted as long as
1417                          * MDT0 is in service*/
1418                         if (flags & OBD_STATFS_FOR_MDT0)
1419                                 GOTO(out_free_temp, rc);
1420                 } else {
1421                         osfs->os_bavail += temp->os_bavail;
1422                         osfs->os_blocks += temp->os_blocks;
1423                         osfs->os_ffree += temp->os_ffree;
1424                         osfs->os_files += temp->os_files;
1425                 }
1426         }
1427
1428         EXIT;
1429 out_free_temp:
1430         OBD_FREE(temp, sizeof(*temp));
1431         return rc;
1432 }
1433
1434 static int lmv_get_root(struct obd_export *exp, const char *fileset,
1435                         struct lu_fid *fid)
1436 {
1437         struct obd_device    *obd = exp->exp_obd;
1438         struct lmv_obd       *lmv = &obd->u.lmv;
1439         int                   rc;
1440         ENTRY;
1441
1442         rc = md_get_root(lmv->tgts[0]->ltd_exp, fileset, fid);
1443         RETURN(rc);
1444 }
1445
1446 static int lmv_getxattr(struct obd_export *exp, const struct lu_fid *fid,
1447                         u64 valid, const char *name,
1448                         const char *input, int input_size, int output_size,
1449                         int flags, struct ptlrpc_request **request)
1450 {
1451         struct obd_device      *obd = exp->exp_obd;
1452         struct lmv_obd         *lmv = &obd->u.lmv;
1453         struct lmv_tgt_desc    *tgt;
1454         int                     rc;
1455         ENTRY;
1456
1457         tgt = lmv_find_target(lmv, fid);
1458         if (IS_ERR(tgt))
1459                 RETURN(PTR_ERR(tgt));
1460
1461         rc = md_getxattr(tgt->ltd_exp, fid, valid, name, input,
1462                          input_size, output_size, flags, request);
1463
1464         RETURN(rc);
1465 }
1466
1467 static int lmv_setxattr(struct obd_export *exp, const struct lu_fid *fid,
1468                         u64 valid, const char *name,
1469                         const char *input, int input_size, int output_size,
1470                         int flags, __u32 suppgid,
1471                         struct ptlrpc_request **request)
1472 {
1473         struct obd_device      *obd = exp->exp_obd;
1474         struct lmv_obd         *lmv = &obd->u.lmv;
1475         struct lmv_tgt_desc    *tgt;
1476         int                     rc;
1477         ENTRY;
1478
1479         tgt = lmv_find_target(lmv, fid);
1480         if (IS_ERR(tgt))
1481                 RETURN(PTR_ERR(tgt));
1482
1483         rc = md_setxattr(tgt->ltd_exp, fid, valid, name, input,
1484                          input_size, output_size, flags, suppgid,
1485                          request);
1486
1487         RETURN(rc);
1488 }
1489
1490 static int lmv_getattr(struct obd_export *exp, struct md_op_data *op_data,
1491                        struct ptlrpc_request **request)
1492 {
1493         struct obd_device       *obd = exp->exp_obd;
1494         struct lmv_obd          *lmv = &obd->u.lmv;
1495         struct lmv_tgt_desc     *tgt;
1496         int                      rc;
1497         ENTRY;
1498
1499         tgt = lmv_find_target(lmv, &op_data->op_fid1);
1500         if (IS_ERR(tgt))
1501                 RETURN(PTR_ERR(tgt));
1502
1503         if (op_data->op_flags & MF_GET_MDT_IDX) {
1504                 op_data->op_mds = tgt->ltd_idx;
1505                 RETURN(0);
1506         }
1507
1508         rc = md_getattr(tgt->ltd_exp, op_data, request);
1509
1510         RETURN(rc);
1511 }
1512
1513 static int lmv_null_inode(struct obd_export *exp, const struct lu_fid *fid)
1514 {
1515         struct obd_device   *obd = exp->exp_obd;
1516         struct lmv_obd      *lmv = &obd->u.lmv;
1517         __u32                i;
1518         ENTRY;
1519
1520         CDEBUG(D_INODE, "CBDATA for "DFID"\n", PFID(fid));
1521
1522         /*
1523          * With DNE every object can have two locks in different namespaces:
1524          * lookup lock in space of MDT storing direntry and update/open lock in
1525          * space of MDT storing inode.
1526          */
1527         for (i = 0; i < lmv->desc.ld_tgt_count; i++) {
1528                 if (lmv->tgts[i] == NULL || lmv->tgts[i]->ltd_exp == NULL)
1529                         continue;
1530                 md_null_inode(lmv->tgts[i]->ltd_exp, fid);
1531         }
1532
1533         RETURN(0);
1534 }
1535
1536 static int lmv_close(struct obd_export *exp, struct md_op_data *op_data,
1537                      struct md_open_data *mod, struct ptlrpc_request **request)
1538 {
1539         struct obd_device     *obd = exp->exp_obd;
1540         struct lmv_obd        *lmv = &obd->u.lmv;
1541         struct lmv_tgt_desc   *tgt;
1542         int                    rc;
1543         ENTRY;
1544
1545         tgt = lmv_find_target(lmv, &op_data->op_fid1);
1546         if (IS_ERR(tgt))
1547                 RETURN(PTR_ERR(tgt));
1548
1549         CDEBUG(D_INODE, "CLOSE "DFID"\n", PFID(&op_data->op_fid1));
1550         rc = md_close(tgt->ltd_exp, op_data, mod, request);
1551         RETURN(rc);
1552 }
1553
1554 /**
1555  * Choosing the MDT by name or FID in @op_data.
1556  * For non-striped directory, it will locate MDT by fid.
1557  * For striped-directory, it will locate MDT by name. And also
1558  * it will reset op_fid1 with the FID of the choosen stripe.
1559  **/
1560 struct lmv_tgt_desc *
1561 lmv_locate_target_for_name(struct lmv_obd *lmv, struct lmv_stripe_md *lsm,
1562                            const char *name, int namelen, struct lu_fid *fid,
1563                            u32 *mds)
1564 {
1565         struct lmv_tgt_desc     *tgt;
1566         const struct lmv_oinfo  *oinfo;
1567
1568         if (OBD_FAIL_CHECK(OBD_FAIL_LFSCK_BAD_NAME_HASH)) {
1569                 if (cfs_fail_val >= lsm->lsm_md_stripe_count)
1570                         RETURN(ERR_PTR(-EBADF));
1571                 oinfo = &lsm->lsm_md_oinfo[cfs_fail_val];
1572         } else {
1573                 oinfo = lsm_name_to_stripe_info(lsm, name, namelen);
1574                 if (IS_ERR(oinfo))
1575                         RETURN(ERR_CAST(oinfo));
1576         }
1577
1578         if (fid != NULL)
1579                 *fid = oinfo->lmo_fid;
1580         if (mds != NULL)
1581                 *mds = oinfo->lmo_mds;
1582
1583         tgt = lmv_get_target(lmv, oinfo->lmo_mds, NULL);
1584
1585         CDEBUG(D_INFO, "locate on mds %u "DFID"\n", oinfo->lmo_mds,
1586                PFID(&oinfo->lmo_fid));
1587         return tgt;
1588 }
1589
1590 /**
1591  * Locate mds by fid or name
1592  *
1593  * For striped directory (lsm != NULL), it will locate the stripe
1594  * by name hash (see lsm_name_to_stripe_info()). Note: if the hash_type
1595  * is unknown, it will return -EBADFD, and lmv_intent_lookup might need
1596  * walk through all of stripes to locate the entry.
1597  *
1598  * For normal direcotry, it will locate MDS by FID directly.
1599  * \param[in] lmv       LMV device
1600  * \param[in] op_data   client MD stack parameters, name, namelen
1601  *                      mds_num etc.
1602  * \param[in] fid       object FID used to locate MDS.
1603  *
1604  * retval               pointer to the lmv_tgt_desc if succeed.
1605  *                      ERR_PTR(errno) if failed.
1606  */
1607 struct lmv_tgt_desc*
1608 lmv_locate_mds(struct lmv_obd *lmv, struct md_op_data *op_data,
1609                struct lu_fid *fid)
1610 {
1611         struct lmv_stripe_md    *lsm = op_data->op_mea1;
1612         struct lmv_tgt_desc     *tgt;
1613
1614         /* During creating VOLATILE file, it should honor the mdt
1615          * index if the file under striped dir is being restored, see
1616          * ct_restore(). */
1617         if (op_data->op_bias & MDS_CREATE_VOLATILE &&
1618             (int)op_data->op_mds != -1) {
1619                 int i;
1620                 tgt = lmv_get_target(lmv, op_data->op_mds, NULL);
1621                 if (IS_ERR(tgt))
1622                         return tgt;
1623
1624                 if (lsm != NULL) {
1625                         /* refill the right parent fid */
1626                         for (i = 0; i < lsm->lsm_md_stripe_count; i++) {
1627                                 struct lmv_oinfo *oinfo;
1628
1629                                 oinfo = &lsm->lsm_md_oinfo[i];
1630                                 if (oinfo->lmo_mds == op_data->op_mds) {
1631                                         *fid = oinfo->lmo_fid;
1632                                         break;
1633                                 }
1634                         }
1635
1636                         if (i == lsm->lsm_md_stripe_count)
1637                                 *fid = lsm->lsm_md_oinfo[0].lmo_fid;
1638                 }
1639
1640                 return tgt;
1641         }
1642
1643         if (lsm == NULL || op_data->op_namelen == 0) {
1644                 tgt = lmv_find_target(lmv, fid);
1645                 if (IS_ERR(tgt))
1646                         return tgt;
1647
1648                 op_data->op_mds = tgt->ltd_idx;
1649                 return tgt;
1650         }
1651
1652         return lmv_locate_target_for_name(lmv, lsm, op_data->op_name,
1653                                           op_data->op_namelen, fid,
1654                                           &op_data->op_mds);
1655 }
1656
1657 int lmv_create(struct obd_export *exp, struct md_op_data *op_data,
1658                 const void *data, size_t datalen, umode_t mode, uid_t uid,
1659                 gid_t gid, cfs_cap_t cap_effective, __u64 rdev,
1660                 struct ptlrpc_request **request)
1661 {
1662         struct obd_device       *obd = exp->exp_obd;
1663         struct lmv_obd          *lmv = &obd->u.lmv;
1664         struct lmv_tgt_desc     *tgt;
1665         int                      rc;
1666         ENTRY;
1667
1668         if (!lmv->desc.ld_active_tgt_count)
1669                 RETURN(-EIO);
1670
1671         tgt = lmv_locate_mds(lmv, op_data, &op_data->op_fid1);
1672         if (IS_ERR(tgt))
1673                 RETURN(PTR_ERR(tgt));
1674
1675         CDEBUG(D_INODE, "CREATE name '%.*s' on "DFID" -> mds #%x\n",
1676                 (int)op_data->op_namelen, op_data->op_name,
1677                 PFID(&op_data->op_fid1), op_data->op_mds);
1678
1679         rc = lmv_fid_alloc(NULL, exp, &op_data->op_fid2, op_data);
1680         if (rc)
1681                 RETURN(rc);
1682         if (exp_connect_flags(exp) & OBD_CONNECT_DIR_STRIPE) {
1683                 /* Send the create request to the MDT where the object
1684                  * will be located */
1685                 tgt = lmv_find_target(lmv, &op_data->op_fid2);
1686                 if (IS_ERR(tgt))
1687                         RETURN(PTR_ERR(tgt));
1688
1689                 op_data->op_mds = tgt->ltd_idx;
1690         } else {
1691                 CDEBUG(D_CONFIG, "Server doesn't support striped dirs\n");
1692         }
1693
1694         CDEBUG(D_INODE, "CREATE obj "DFID" -> mds #%x\n",
1695                PFID(&op_data->op_fid2), op_data->op_mds);
1696
1697         op_data->op_flags |= MF_MDC_CANCEL_FID1;
1698         rc = md_create(tgt->ltd_exp, op_data, data, datalen, mode, uid, gid,
1699                        cap_effective, rdev, request);
1700         if (rc == 0) {
1701                 if (*request == NULL)
1702                         RETURN(rc);
1703                 CDEBUG(D_INODE, "Created - "DFID"\n", PFID(&op_data->op_fid2));
1704         }
1705         RETURN(rc);
1706 }
1707
1708 static int
1709 lmv_enqueue(struct obd_export *exp, struct ldlm_enqueue_info *einfo,
1710             const union ldlm_policy_data *policy, struct md_op_data *op_data,
1711             struct lustre_handle *lockh, __u64 extra_lock_flags)
1712 {
1713         struct obd_device        *obd = exp->exp_obd;
1714         struct lmv_obd           *lmv = &obd->u.lmv;
1715         struct lmv_tgt_desc      *tgt;
1716         int                       rc;
1717         ENTRY;
1718
1719         CDEBUG(D_INODE, "ENQUEUE on "DFID"\n", PFID(&op_data->op_fid1));
1720
1721         tgt = lmv_locate_mds(lmv, op_data, &op_data->op_fid1);
1722         if (IS_ERR(tgt))
1723                 RETURN(PTR_ERR(tgt));
1724
1725         CDEBUG(D_INODE, "ENQUEUE on "DFID" -> mds #%u\n",
1726                PFID(&op_data->op_fid1), tgt->ltd_idx);
1727
1728         rc = md_enqueue(tgt->ltd_exp, einfo, policy, op_data, lockh,
1729                         extra_lock_flags);
1730
1731         RETURN(rc);
1732 }
1733
1734 static int
1735 lmv_getattr_name(struct obd_export *exp,struct md_op_data *op_data,
1736                  struct ptlrpc_request **preq)
1737 {
1738         struct ptlrpc_request   *req = NULL;
1739         struct obd_device       *obd = exp->exp_obd;
1740         struct lmv_obd          *lmv = &obd->u.lmv;
1741         struct lmv_tgt_desc     *tgt;
1742         struct mdt_body         *body;
1743         int                      rc;
1744         ENTRY;
1745
1746         tgt = lmv_locate_mds(lmv, op_data, &op_data->op_fid1);
1747         if (IS_ERR(tgt))
1748                 RETURN(PTR_ERR(tgt));
1749
1750         CDEBUG(D_INODE, "GETATTR_NAME for %*s on "DFID" -> mds #%d\n",
1751                 (int)op_data->op_namelen, op_data->op_name,
1752                 PFID(&op_data->op_fid1), tgt->ltd_idx);
1753
1754         rc = md_getattr_name(tgt->ltd_exp, op_data, preq);
1755         if (rc != 0)
1756                 RETURN(rc);
1757
1758         body = req_capsule_server_get(&(*preq)->rq_pill, &RMF_MDT_BODY);
1759         LASSERT(body != NULL);
1760
1761         if (body->mbo_valid & OBD_MD_MDS) {
1762                 struct lu_fid rid = body->mbo_fid1;
1763                 CDEBUG(D_INODE, "Request attrs for "DFID"\n",
1764                        PFID(&rid));
1765
1766                 tgt = lmv_find_target(lmv, &rid);
1767                 if (IS_ERR(tgt)) {
1768                         ptlrpc_req_finished(*preq);
1769                         preq = NULL;
1770                         RETURN(PTR_ERR(tgt));
1771                 }
1772
1773                 op_data->op_fid1 = rid;
1774                 op_data->op_valid |= OBD_MD_FLCROSSREF;
1775                 op_data->op_namelen = 0;
1776                 op_data->op_name = NULL;
1777                 rc = md_getattr_name(tgt->ltd_exp, op_data, &req);
1778                 ptlrpc_req_finished(*preq);
1779                 *preq = req;
1780         }
1781
1782         RETURN(rc);
1783 }
1784
1785 #define md_op_data_fid(op_data, fl)                     \
1786         (fl == MF_MDC_CANCEL_FID1 ? &op_data->op_fid1 : \
1787          fl == MF_MDC_CANCEL_FID2 ? &op_data->op_fid2 : \
1788          fl == MF_MDC_CANCEL_FID3 ? &op_data->op_fid3 : \
1789          fl == MF_MDC_CANCEL_FID4 ? &op_data->op_fid4 : \
1790          NULL)
1791
1792 static int lmv_early_cancel(struct obd_export *exp, struct lmv_tgt_desc *tgt,
1793                             struct md_op_data *op_data, __u32 op_tgt,
1794                             enum ldlm_mode mode, int bits, int flag)
1795 {
1796         struct lu_fid *fid = md_op_data_fid(op_data, flag);
1797         struct lmv_obd *lmv = &exp->exp_obd->u.lmv;
1798         union ldlm_policy_data policy = { { 0 } };
1799         int rc = 0;
1800         ENTRY;
1801
1802         if (!fid_is_sane(fid))
1803                 RETURN(0);
1804
1805         if (tgt == NULL) {
1806                 tgt = lmv_find_target(lmv, fid);
1807                 if (IS_ERR(tgt))
1808                         RETURN(PTR_ERR(tgt));
1809         }
1810
1811         if (tgt->ltd_idx != op_tgt) {
1812                 CDEBUG(D_INODE, "EARLY_CANCEL on "DFID"\n", PFID(fid));
1813                 policy.l_inodebits.bits = bits;
1814                 rc = md_cancel_unused(tgt->ltd_exp, fid, &policy,
1815                                       mode, LCF_ASYNC, NULL);
1816         } else {
1817                 CDEBUG(D_INODE,
1818                        "EARLY_CANCEL skip operation target %d on "DFID"\n",
1819                        op_tgt, PFID(fid));
1820                 op_data->op_flags |= flag;
1821                 rc = 0;
1822         }
1823
1824         RETURN(rc);
1825 }
1826
1827 /*
1828  * llite passes fid of an target inode in op_data->op_fid1 and id of directory in
1829  * op_data->op_fid2
1830  */
1831 static int lmv_link(struct obd_export *exp, struct md_op_data *op_data,
1832                     struct ptlrpc_request **request)
1833 {
1834         struct obd_device       *obd = exp->exp_obd;
1835         struct lmv_obd          *lmv = &obd->u.lmv;
1836         struct lmv_tgt_desc     *tgt;
1837         int                      rc;
1838         ENTRY;
1839
1840         LASSERT(op_data->op_namelen != 0);
1841
1842         CDEBUG(D_INODE, "LINK "DFID":%*s to "DFID"\n",
1843                PFID(&op_data->op_fid2), (int)op_data->op_namelen,
1844                op_data->op_name, PFID(&op_data->op_fid1));
1845
1846         op_data->op_fsuid = from_kuid(&init_user_ns, current_fsuid());
1847         op_data->op_fsgid = from_kgid(&init_user_ns, current_fsgid());
1848         op_data->op_cap = cfs_curproc_cap_pack();
1849         if (op_data->op_mea2 != NULL) {
1850                 struct lmv_stripe_md    *lsm = op_data->op_mea2;
1851                 const struct lmv_oinfo  *oinfo;
1852
1853                 oinfo = lsm_name_to_stripe_info(lsm, op_data->op_name,
1854                                                 op_data->op_namelen);
1855                 if (IS_ERR(oinfo))
1856                         RETURN(PTR_ERR(oinfo));
1857
1858                 op_data->op_fid2 = oinfo->lmo_fid;
1859         }
1860
1861         tgt = lmv_locate_mds(lmv, op_data, &op_data->op_fid2);
1862         if (IS_ERR(tgt))
1863                 RETURN(PTR_ERR(tgt));
1864
1865         /*
1866          * Cancel UPDATE lock on child (fid1).
1867          */
1868         op_data->op_flags |= MF_MDC_CANCEL_FID2;
1869         rc = lmv_early_cancel(exp, NULL, op_data, tgt->ltd_idx, LCK_EX,
1870                               MDS_INODELOCK_UPDATE, MF_MDC_CANCEL_FID1);
1871         if (rc != 0)
1872                 RETURN(rc);
1873
1874         rc = md_link(tgt->ltd_exp, op_data, request);
1875
1876         RETURN(rc);
1877 }
1878
1879 static int lmv_rename(struct obd_export *exp, struct md_op_data *op_data,
1880                       const char *old, size_t oldlen,
1881                       const char *new, size_t newlen,
1882                       struct ptlrpc_request **request)
1883 {
1884         struct obd_device       *obd = exp->exp_obd;
1885         struct lmv_obd          *lmv = &obd->u.lmv;
1886         struct lmv_tgt_desc     *src_tgt;
1887         struct lmv_tgt_desc     *tgt_tgt;
1888         struct obd_export       *target_exp;
1889         struct mdt_body         *body;
1890         int                     rc;
1891         ENTRY;
1892
1893         LASSERT(oldlen != 0);
1894
1895         CDEBUG(D_INODE, "RENAME %.*s in "DFID":%d to %.*s in "DFID":%d\n",
1896                (int)oldlen, old, PFID(&op_data->op_fid1),
1897                op_data->op_mea1 ? op_data->op_mea1->lsm_md_stripe_count : 0,
1898                (int)newlen, new, PFID(&op_data->op_fid2),
1899                op_data->op_mea2 ? op_data->op_mea2->lsm_md_stripe_count : 0);
1900
1901         op_data->op_fsuid = from_kuid(&init_user_ns, current_fsuid());
1902         op_data->op_fsgid = from_kgid(&init_user_ns, current_fsgid());
1903         op_data->op_cap = cfs_curproc_cap_pack();
1904         if (op_data->op_cli_flags & CLI_MIGRATE) {
1905                 LASSERTF(fid_is_sane(&op_data->op_fid3), "invalid FID "DFID"\n",
1906                          PFID(&op_data->op_fid3));
1907
1908                 if (op_data->op_mea1 != NULL) {
1909                         struct lmv_stripe_md    *lsm = op_data->op_mea1;
1910                         struct lmv_tgt_desc     *tmp;
1911
1912                         /* Fix the parent fid for striped dir */
1913                         tmp = lmv_locate_target_for_name(lmv, lsm, old,
1914                                                          oldlen,
1915                                                          &op_data->op_fid1,
1916                                                          NULL);
1917                         if (IS_ERR(tmp))
1918                                 RETURN(PTR_ERR(tmp));
1919                 }
1920
1921                 rc = lmv_fid_alloc(NULL, exp, &op_data->op_fid2, op_data);
1922                 if (rc != 0)
1923                         RETURN(rc);
1924
1925                 src_tgt = lmv_find_target(lmv, &op_data->op_fid3);
1926                 if (IS_ERR(src_tgt))
1927                         RETURN(PTR_ERR(src_tgt));
1928
1929                 target_exp = src_tgt->ltd_exp;
1930         } else {
1931                 if (op_data->op_mea1 != NULL) {
1932                         struct lmv_stripe_md    *lsm = op_data->op_mea1;
1933
1934                         src_tgt = lmv_locate_target_for_name(lmv, lsm, old,
1935                                                              oldlen,
1936                                                              &op_data->op_fid1,
1937                                                              &op_data->op_mds);
1938                 } else {
1939                         src_tgt = lmv_find_target(lmv, &op_data->op_fid1);
1940                 }
1941                 if (IS_ERR(src_tgt))
1942                         RETURN(PTR_ERR(src_tgt));
1943
1944
1945                 if (op_data->op_mea2 != NULL) {
1946                         struct lmv_stripe_md    *lsm = op_data->op_mea2;
1947
1948                         tgt_tgt = lmv_locate_target_for_name(lmv, lsm, new,
1949                                                              newlen,
1950                                                              &op_data->op_fid2,
1951                                                              &op_data->op_mds);
1952                 } else {
1953                         tgt_tgt = lmv_find_target(lmv, &op_data->op_fid2);
1954
1955                 }
1956                 if (IS_ERR(tgt_tgt))
1957                         RETURN(PTR_ERR(tgt_tgt));
1958
1959                 target_exp = tgt_tgt->ltd_exp;
1960         }
1961
1962         /*
1963          * LOOKUP lock on src child (fid3) should also be cancelled for
1964          * src_tgt in mdc_rename.
1965          */
1966         op_data->op_flags |= MF_MDC_CANCEL_FID1 | MF_MDC_CANCEL_FID3;
1967
1968         /*
1969          * Cancel UPDATE locks on tgt parent (fid2), tgt_tgt is its
1970          * own target.
1971          */
1972         rc = lmv_early_cancel(exp, NULL, op_data, src_tgt->ltd_idx,
1973                               LCK_EX, MDS_INODELOCK_UPDATE,
1974                               MF_MDC_CANCEL_FID2);
1975
1976         if (rc != 0)
1977                 RETURN(rc);
1978         /*
1979          * Cancel LOOKUP locks on source child (fid3) for parent tgt_tgt.
1980          */
1981         if (fid_is_sane(&op_data->op_fid3)) {
1982                 struct lmv_tgt_desc *tgt;
1983
1984                 tgt = lmv_find_target(lmv, &op_data->op_fid1);
1985                 if (IS_ERR(tgt))
1986                         RETURN(PTR_ERR(tgt));
1987
1988                 /* Cancel LOOKUP lock on its parent */
1989                 rc = lmv_early_cancel(exp, tgt, op_data, src_tgt->ltd_idx,
1990                                       LCK_EX, MDS_INODELOCK_LOOKUP,
1991                                       MF_MDC_CANCEL_FID3);
1992                 if (rc != 0)
1993                         RETURN(rc);
1994
1995                 rc = lmv_early_cancel(exp, NULL, op_data, src_tgt->ltd_idx,
1996                                       LCK_EX, MDS_INODELOCK_FULL,
1997                                       MF_MDC_CANCEL_FID3);
1998                 if (rc != 0)
1999                         RETURN(rc);
2000         }
2001
2002 retry_rename:
2003         /*
2004          * Cancel all the locks on tgt child (fid4).
2005          */
2006         if (fid_is_sane(&op_data->op_fid4)) {
2007                 struct lmv_tgt_desc *tgt;
2008
2009                 rc = lmv_early_cancel(exp, NULL, op_data, src_tgt->ltd_idx,
2010                                       LCK_EX, MDS_INODELOCK_FULL,
2011                                       MF_MDC_CANCEL_FID4);
2012                 if (rc != 0)
2013                         RETURN(rc);
2014
2015                 tgt = lmv_find_target(lmv, &op_data->op_fid4);
2016                 if (IS_ERR(tgt))
2017                         RETURN(PTR_ERR(tgt));
2018
2019                 /* Since the target child might be destroyed, and it might
2020                  * become orphan, and we can only check orphan on the local
2021                  * MDT right now, so we send rename request to the MDT where
2022                  * target child is located. If target child does not exist,
2023                  * then it will send the request to the target parent */
2024                 target_exp = tgt->ltd_exp;
2025         }
2026
2027         rc = md_rename(target_exp, op_data, old, oldlen, new, newlen,
2028                        request);
2029
2030         if (rc != 0 && rc != -EXDEV)
2031                 RETURN(rc);
2032
2033         body = req_capsule_server_get(&(*request)->rq_pill, &RMF_MDT_BODY);
2034         if (body == NULL)
2035                 RETURN(-EPROTO);
2036
2037         /* Not cross-ref case, just get out of here. */
2038         if (likely(!(body->mbo_valid & OBD_MD_MDS)))
2039                 RETURN(rc);
2040
2041         CDEBUG(D_INODE, "%s: try rename to another MDT for "DFID"\n",
2042                exp->exp_obd->obd_name, PFID(&body->mbo_fid1));
2043
2044         op_data->op_fid4 = body->mbo_fid1;
2045         ptlrpc_req_finished(*request);
2046         *request = NULL;
2047         goto retry_rename;
2048 }
2049
2050 static int lmv_setattr(struct obd_export *exp, struct md_op_data *op_data,
2051                        void *ea, size_t ealen, struct ptlrpc_request **request)
2052 {
2053         struct obd_device       *obd = exp->exp_obd;
2054         struct lmv_obd          *lmv = &obd->u.lmv;
2055         struct lmv_tgt_desc     *tgt;
2056         int                      rc = 0;
2057         ENTRY;
2058
2059         CDEBUG(D_INODE, "SETATTR for "DFID", valid 0x%x\n",
2060                PFID(&op_data->op_fid1), op_data->op_attr.ia_valid);
2061
2062         op_data->op_flags |= MF_MDC_CANCEL_FID1;
2063         tgt = lmv_find_target(lmv, &op_data->op_fid1);
2064         if (IS_ERR(tgt))
2065                 RETURN(PTR_ERR(tgt));
2066
2067         rc = md_setattr(tgt->ltd_exp, op_data, ea, ealen, request);
2068
2069         RETURN(rc);
2070 }
2071
2072 static int lmv_fsync(struct obd_export *exp, const struct lu_fid *fid,
2073                      struct ptlrpc_request **request)
2074 {
2075         struct obd_device       *obd = exp->exp_obd;
2076         struct lmv_obd          *lmv = &obd->u.lmv;
2077         struct lmv_tgt_desc     *tgt;
2078         int                      rc;
2079         ENTRY;
2080
2081         tgt = lmv_find_target(lmv, fid);
2082         if (IS_ERR(tgt))
2083                 RETURN(PTR_ERR(tgt));
2084
2085         rc = md_fsync(tgt->ltd_exp, fid, request);
2086         RETURN(rc);
2087 }
2088
2089 /**
2090  * Get current minimum entry from striped directory
2091  *
2092  * This function will search the dir entry, whose hash value is the
2093  * closest(>=) to @hash_offset, from all of sub-stripes, and it is
2094  * only being called for striped directory.
2095  *
2096  * \param[in] exp               export of LMV
2097  * \param[in] op_data           parameters transferred beween client MD stack
2098  *                              stripe_information will be included in this
2099  *                              parameter
2100  * \param[in] cb_op             ldlm callback being used in enqueue in
2101  *                              mdc_read_page
2102  * \param[in] hash_offset       the hash value, which is used to locate
2103  *                              minum(closet) dir entry
2104  * \param[in|out] stripe_offset the caller use this to indicate the stripe
2105  *                              index of last entry, so to avoid hash conflict
2106  *                              between stripes. It will also be used to
2107  *                              return the stripe index of current dir entry.
2108  * \param[in|out] entp          the minum entry and it also is being used
2109  *                              to input the last dir entry to resolve the
2110  *                              hash conflict
2111  *
2112  * \param[out] ppage            the page which holds the minum entry
2113  *
2114  * \retval                      = 0 get the entry successfully
2115  *                              negative errno (< 0) does not get the entry
2116  */
2117 static int lmv_get_min_striped_entry(struct obd_export *exp,
2118                                      struct md_op_data *op_data,
2119                                      struct md_callback *cb_op,
2120                                      __u64 hash_offset, int *stripe_offset,
2121                                      struct lu_dirent **entp,
2122                                      struct page **ppage)
2123 {
2124         struct obd_device       *obd = exp->exp_obd;
2125         struct lmv_obd          *lmv = &obd->u.lmv;
2126         struct lmv_stripe_md    *lsm = op_data->op_mea1;
2127         struct lmv_tgt_desc     *tgt;
2128         int                     stripe_count;
2129         struct lu_dirent        *min_ent = NULL;
2130         struct page             *min_page = NULL;
2131         int                     min_idx = 0;
2132         int                     i;
2133         int                     rc = 0;
2134         ENTRY;
2135
2136         stripe_count = lsm->lsm_md_stripe_count;
2137         for (i = 0; i < stripe_count; i++) {
2138                 struct lu_dirent        *ent = NULL;
2139                 struct page             *page = NULL;
2140                 struct lu_dirpage       *dp;
2141                 __u64                   stripe_hash = hash_offset;
2142
2143                 tgt = lmv_get_target(lmv, lsm->lsm_md_oinfo[i].lmo_mds, NULL);
2144                 if (IS_ERR(tgt))
2145                         GOTO(out, rc = PTR_ERR(tgt));
2146
2147                 /* op_data will be shared by each stripe, so we need
2148                  * reset these value for each stripe */
2149                 op_data->op_fid1 = lsm->lsm_md_oinfo[i].lmo_fid;
2150                 op_data->op_fid2 = lsm->lsm_md_oinfo[i].lmo_fid;
2151                 op_data->op_data = lsm->lsm_md_oinfo[i].lmo_root;
2152 next:
2153                 rc = md_read_page(tgt->ltd_exp, op_data, cb_op, stripe_hash,
2154                                   &page);
2155                 if (rc != 0)
2156                         GOTO(out, rc);
2157
2158                 dp = page_address(page);
2159                 for (ent = lu_dirent_start(dp); ent != NULL;
2160                      ent = lu_dirent_next(ent)) {
2161                         /* Skip dummy entry */
2162                         if (le16_to_cpu(ent->lde_namelen) == 0)
2163                                 continue;
2164
2165                         if (le64_to_cpu(ent->lde_hash) < hash_offset)
2166                                 continue;
2167
2168                         if (le64_to_cpu(ent->lde_hash) == hash_offset &&
2169                             (*entp == ent || i < *stripe_offset))
2170                                 continue;
2171
2172                         /* skip . and .. for other stripes */
2173                         if (i != 0 &&
2174                             (strncmp(ent->lde_name, ".",
2175                                      le16_to_cpu(ent->lde_namelen)) == 0 ||
2176                              strncmp(ent->lde_name, "..",
2177                                      le16_to_cpu(ent->lde_namelen)) == 0))
2178                                 continue;
2179                         break;
2180                 }
2181
2182                 if (ent == NULL) {
2183                         stripe_hash = le64_to_cpu(dp->ldp_hash_end);
2184
2185                         kunmap(page);
2186                         page_cache_release(page);
2187                         page = NULL;
2188
2189                         /* reach the end of current stripe, go to next stripe */
2190                         if (stripe_hash == MDS_DIR_END_OFF)
2191                                 continue;
2192                         else
2193                                 goto next;
2194                 }
2195
2196                 if (min_ent != NULL) {
2197                         if (le64_to_cpu(min_ent->lde_hash) >
2198                             le64_to_cpu(ent->lde_hash)) {
2199                                 min_ent = ent;
2200                                 kunmap(min_page);
2201                                 page_cache_release(min_page);
2202                                 min_idx = i;
2203                                 min_page = page;
2204                         } else {
2205                                 kunmap(page);
2206                                 page_cache_release(page);
2207                                 page = NULL;
2208                         }
2209                 } else {
2210                         min_ent = ent;
2211                         min_page = page;
2212                         min_idx = i;
2213                 }
2214         }
2215
2216 out:
2217         if (*ppage != NULL) {
2218                 kunmap(*ppage);
2219                 page_cache_release(*ppage);
2220         }
2221         *stripe_offset = min_idx;
2222         *entp = min_ent;
2223         *ppage = min_page;
2224         RETURN(rc);
2225 }
2226
2227 /**
2228  * Build dir entry page from a striped directory
2229  *
2230  * This function gets one entry by @offset from a striped directory. It will
2231  * read entries from all of stripes, and choose one closest to the required
2232  * offset(&offset). A few notes
2233  * 1. skip . and .. for non-zero stripes, because there can only have one .
2234  * and .. in a directory.
2235  * 2. op_data will be shared by all of stripes, instead of allocating new
2236  * one, so need to restore before reusing.
2237  * 3. release the entry page if that is not being chosen.
2238  *
2239  * \param[in] exp       obd export refer to LMV
2240  * \param[in] op_data   hold those MD parameters of read_entry
2241  * \param[in] cb_op     ldlm callback being used in enqueue in mdc_read_entry
2242  * \param[out] ldp      the entry being read
2243  * \param[out] ppage    the page holding the entry. Note: because the entry
2244  *                      will be accessed in upper layer, so we need hold the
2245  *                      page until the usages of entry is finished, see
2246  *                      ll_dir_entry_next.
2247  *
2248  * retval               =0 if get entry successfully
2249  *                      <0 cannot get entry
2250  */
2251 static int lmv_read_striped_page(struct obd_export *exp,
2252                                  struct md_op_data *op_data,
2253                                  struct md_callback *cb_op,
2254                                  __u64 offset, struct page **ppage)
2255 {
2256         struct lu_fid           master_fid = op_data->op_fid1;
2257         struct inode            *master_inode = op_data->op_data;
2258         __u64                   hash_offset = offset;
2259         struct lu_dirpage       *dp;
2260         struct page             *min_ent_page = NULL;
2261         struct page             *ent_page = NULL;
2262         struct lu_dirent        *ent;
2263         void                    *area;
2264         int                     ent_idx = 0;
2265         struct lu_dirent        *min_ent = NULL;
2266         struct lu_dirent        *last_ent;
2267         size_t                  left_bytes;
2268         int                     rc;
2269         ENTRY;
2270
2271         /* Allocate a page and read entries from all of stripes and fill
2272          * the page by hash order */
2273         ent_page = alloc_page(GFP_KERNEL);
2274         if (ent_page == NULL)
2275                 RETURN(-ENOMEM);
2276
2277         /* Initialize the entry page */
2278         dp = kmap(ent_page);
2279         memset(dp, 0, sizeof(*dp));
2280         dp->ldp_hash_start = cpu_to_le64(offset);
2281         dp->ldp_flags |= LDF_COLLIDE;
2282
2283         area = dp + 1;
2284         left_bytes = PAGE_SIZE - sizeof(*dp);
2285         ent = area;
2286         last_ent = ent;
2287         do {
2288                 __u16   ent_size;
2289
2290                 /* Find the minum entry from all sub-stripes */
2291                 rc = lmv_get_min_striped_entry(exp, op_data, cb_op, hash_offset,
2292                                                &ent_idx, &min_ent,
2293                                                &min_ent_page);
2294                 if (rc != 0)
2295                         GOTO(out, rc);
2296
2297                 /* If it can not get minum entry, it means it already reaches
2298                  * the end of this directory */
2299                 if (min_ent == NULL) {
2300                         last_ent->lde_reclen = 0;
2301                         hash_offset = MDS_DIR_END_OFF;
2302                         GOTO(out, rc);
2303                 }
2304
2305                 ent_size = le16_to_cpu(min_ent->lde_reclen);
2306
2307                 /* the last entry lde_reclen is 0, but it might not
2308                  * the end of this entry of this temporay entry */
2309                 if (ent_size == 0)
2310                         ent_size = lu_dirent_calc_size(
2311                                         le16_to_cpu(min_ent->lde_namelen),
2312                                         le32_to_cpu(min_ent->lde_attrs));
2313                 if (ent_size > left_bytes) {
2314                         last_ent->lde_reclen = cpu_to_le16(0);
2315                         hash_offset = le64_to_cpu(min_ent->lde_hash);
2316                         GOTO(out, rc);
2317                 }
2318
2319                 memcpy(ent, min_ent, ent_size);
2320
2321                 /* Replace . with master FID and Replace .. with the parent FID
2322                  * of master object */
2323                 if (strncmp(ent->lde_name, ".",
2324                             le16_to_cpu(ent->lde_namelen)) == 0 &&
2325                     le16_to_cpu(ent->lde_namelen) == 1)
2326                         fid_cpu_to_le(&ent->lde_fid, &master_fid);
2327                 else if (strncmp(ent->lde_name, "..",
2328                                    le16_to_cpu(ent->lde_namelen)) == 0 &&
2329                            le16_to_cpu(ent->lde_namelen) == 2)
2330                         fid_cpu_to_le(&ent->lde_fid, &op_data->op_fid3);
2331
2332                 left_bytes -= ent_size;
2333                 ent->lde_reclen = cpu_to_le16(ent_size);
2334                 last_ent = ent;
2335                 ent = (void *)ent + ent_size;
2336                 hash_offset = le64_to_cpu(min_ent->lde_hash);
2337                 if (hash_offset == MDS_DIR_END_OFF) {
2338                         last_ent->lde_reclen = 0;
2339                         break;
2340                 }
2341         } while (1);
2342 out:
2343         if (min_ent_page != NULL) {
2344                 kunmap(min_ent_page);
2345                 page_cache_release(min_ent_page);
2346         }
2347
2348         if (unlikely(rc != 0)) {
2349                 __free_page(ent_page);
2350                 ent_page = NULL;
2351         } else {
2352                 if (ent == area)
2353                         dp->ldp_flags |= LDF_EMPTY;
2354                 dp->ldp_flags = cpu_to_le32(dp->ldp_flags);
2355                 dp->ldp_hash_end = cpu_to_le64(hash_offset);
2356         }
2357
2358         /* We do not want to allocate md_op_data during each
2359          * dir entry reading, so op_data will be shared by every stripe,
2360          * then we need to restore it back to original value before
2361          * return to the upper layer */
2362         op_data->op_fid1 = master_fid;
2363         op_data->op_fid2 = master_fid;
2364         op_data->op_data = master_inode;
2365
2366         *ppage = ent_page;
2367
2368         RETURN(rc);
2369 }
2370
2371 int lmv_read_page(struct obd_export *exp, struct md_op_data *op_data,
2372                   struct md_callback *cb_op, __u64 offset,
2373                   struct page **ppage)
2374 {
2375         struct obd_device       *obd = exp->exp_obd;
2376         struct lmv_obd          *lmv = &obd->u.lmv;
2377         struct lmv_stripe_md    *lsm = op_data->op_mea1;
2378         struct lmv_tgt_desc     *tgt;
2379         int                     rc;
2380         ENTRY;
2381
2382         if (unlikely(lsm != NULL)) {
2383                 rc = lmv_read_striped_page(exp, op_data, cb_op, offset, ppage);
2384                 RETURN(rc);
2385         }
2386
2387         tgt = lmv_find_target(lmv, &op_data->op_fid1);
2388         if (IS_ERR(tgt))
2389                 RETURN(PTR_ERR(tgt));
2390
2391         rc = md_read_page(tgt->ltd_exp, op_data, cb_op, offset, ppage);
2392
2393         RETURN(rc);
2394 }
2395
2396 /**
2397  * Unlink a file/directory
2398  *
2399  * Unlink a file or directory under the parent dir. The unlink request
2400  * usually will be sent to the MDT where the child is located, but if
2401  * the client does not have the child FID then request will be sent to the
2402  * MDT where the parent is located.
2403  *
2404  * If the parent is a striped directory then it also needs to locate which
2405  * stripe the name of the child is located, and replace the parent FID
2406  * (@op->op_fid1) with the stripe FID. Note: if the stripe is unknown,
2407  * it will walk through all of sub-stripes until the child is being
2408  * unlinked finally.
2409  *
2410  * \param[in] exp       export refer to LMV
2411  * \param[in] op_data   different parameters transferred beween client
2412  *                      MD stacks, name, namelen, FIDs etc.
2413  *                      op_fid1 is the parent FID, op_fid2 is the child
2414  *                      FID.
2415  * \param[out] request  point to the request of unlink.
2416  *
2417  * retval               0 if succeed
2418  *                      negative errno if failed.
2419  */
2420 static int lmv_unlink(struct obd_export *exp, struct md_op_data *op_data,
2421                       struct ptlrpc_request **request)
2422 {
2423         struct obd_device       *obd = exp->exp_obd;
2424         struct lmv_obd          *lmv = &obd->u.lmv;
2425         struct lmv_tgt_desc     *tgt = NULL;
2426         struct lmv_tgt_desc     *parent_tgt = NULL;
2427         struct mdt_body         *body;
2428         int                     rc;
2429         int                     stripe_index = 0;
2430         struct lmv_stripe_md    *lsm = op_data->op_mea1;
2431         ENTRY;
2432
2433 retry_unlink:
2434         /* For striped dir, we need to locate the parent as well */
2435         if (lsm != NULL) {
2436                 struct lmv_tgt_desc *tmp;
2437
2438                 LASSERT(op_data->op_name != NULL &&
2439                         op_data->op_namelen != 0);
2440
2441                 tmp = lmv_locate_target_for_name(lmv, lsm,
2442                                                  op_data->op_name,
2443                                                  op_data->op_namelen,
2444                                                  &op_data->op_fid1,
2445                                                  &op_data->op_mds);
2446
2447                 /* return -EBADFD means unknown hash type, might
2448                  * need try all sub-stripe here */
2449                 if (IS_ERR(tmp) && PTR_ERR(tmp) != -EBADFD)
2450                         RETURN(PTR_ERR(tmp));
2451
2452                 /* Note: both migrating dir and unknown hash dir need to
2453                  * try all of sub-stripes, so we need start search the
2454                  * name from stripe 0, but migrating dir is already handled
2455                  * inside lmv_locate_target_for_name(), so we only check
2456                  * unknown hash type directory here */
2457                 if (!lmv_is_known_hash_type(lsm->lsm_md_hash_type)) {
2458                         struct lmv_oinfo *oinfo;
2459
2460                         oinfo = &lsm->lsm_md_oinfo[stripe_index];
2461
2462                         op_data->op_fid1 = oinfo->lmo_fid;
2463                         op_data->op_mds = oinfo->lmo_mds;
2464                 }
2465         }
2466
2467 try_next_stripe:
2468         /* Send unlink requests to the MDT where the child is located */
2469         if (likely(!fid_is_zero(&op_data->op_fid2)))
2470                 tgt = lmv_find_target(lmv, &op_data->op_fid2);
2471         else if (lsm != NULL)
2472                 tgt = lmv_get_target(lmv, op_data->op_mds, NULL);
2473         else
2474                 tgt = lmv_locate_mds(lmv, op_data, &op_data->op_fid1);
2475
2476         if (IS_ERR(tgt))
2477                 RETURN(PTR_ERR(tgt));
2478
2479         op_data->op_fsuid = from_kuid(&init_user_ns, current_fsuid());
2480         op_data->op_fsgid = from_kgid(&init_user_ns, current_fsgid());
2481         op_data->op_cap = cfs_curproc_cap_pack();
2482
2483         /*
2484          * If child's fid is given, cancel unused locks for it if it is from
2485          * another export than parent.
2486          *
2487          * LOOKUP lock for child (fid3) should also be cancelled on parent
2488          * tgt_tgt in mdc_unlink().
2489          */
2490         op_data->op_flags |= MF_MDC_CANCEL_FID1 | MF_MDC_CANCEL_FID3;
2491
2492         /*
2493          * Cancel FULL locks on child (fid3).
2494          */
2495         parent_tgt = lmv_find_target(lmv, &op_data->op_fid1);
2496         if (IS_ERR(parent_tgt))
2497                 RETURN(PTR_ERR(parent_tgt));
2498
2499         if (parent_tgt != tgt) {
2500                 rc = lmv_early_cancel(exp, parent_tgt, op_data, tgt->ltd_idx,
2501                                       LCK_EX, MDS_INODELOCK_LOOKUP,
2502                                       MF_MDC_CANCEL_FID3);
2503         }
2504
2505         rc = lmv_early_cancel(exp, NULL, op_data, tgt->ltd_idx, LCK_EX,
2506                               MDS_INODELOCK_FULL, MF_MDC_CANCEL_FID3);
2507         if (rc != 0)
2508                 RETURN(rc);
2509
2510         CDEBUG(D_INODE, "unlink with fid="DFID"/"DFID" -> mds #%u\n",
2511                PFID(&op_data->op_fid1), PFID(&op_data->op_fid2), tgt->ltd_idx);
2512
2513         rc = md_unlink(tgt->ltd_exp, op_data, request);
2514         if (rc != 0 && rc != -EREMOTE && rc != -ENOENT)
2515                 RETURN(rc);
2516
2517         /* Try next stripe if it is needed. */
2518         if (rc == -ENOENT && lsm != NULL && lmv_need_try_all_stripes(lsm)) {
2519                 struct lmv_oinfo *oinfo;
2520
2521                 stripe_index++;
2522                 if (stripe_index >= lsm->lsm_md_stripe_count)
2523                         RETURN(rc);
2524
2525                 oinfo = &lsm->lsm_md_oinfo[stripe_index];
2526
2527                 op_data->op_fid1 = oinfo->lmo_fid;
2528                 op_data->op_mds = oinfo->lmo_mds;
2529
2530                 ptlrpc_req_finished(*request);
2531                 *request = NULL;
2532
2533                 goto try_next_stripe;
2534         }
2535
2536         body = req_capsule_server_get(&(*request)->rq_pill, &RMF_MDT_BODY);
2537         if (body == NULL)
2538                 RETURN(-EPROTO);
2539
2540         /* Not cross-ref case, just get out of here. */
2541         if (likely(!(body->mbo_valid & OBD_MD_MDS)))
2542                 RETURN(rc);
2543
2544         CDEBUG(D_INODE, "%s: try unlink to another MDT for "DFID"\n",
2545                exp->exp_obd->obd_name, PFID(&body->mbo_fid1));
2546
2547         /* This is a remote object, try remote MDT, Note: it may
2548          * try more than 1 time here, Considering following case
2549          * /mnt/lustre is root on MDT0, remote1 is on MDT1
2550          * 1. Initially A does not know where remote1 is, it send
2551          *    unlink RPC to MDT0, MDT0 return -EREMOTE, it will
2552          *    resend unlink RPC to MDT1 (retry 1st time).
2553          *
2554          * 2. During the unlink RPC in flight,
2555          *    client B mv /mnt/lustre/remote1 /mnt/lustre/remote2
2556          *    and create new remote1, but on MDT0
2557          *
2558          * 3. MDT1 get unlink RPC(from A), then do remote lock on
2559          *    /mnt/lustre, then lookup get fid of remote1, and find
2560          *    it is remote dir again, and replay -EREMOTE again.
2561          *
2562          * 4. Then A will resend unlink RPC to MDT0. (retry 2nd times).
2563          *
2564          * In theory, it might try unlimited time here, but it should
2565          * be very rare case.  */
2566         op_data->op_fid2 = body->mbo_fid1;
2567         ptlrpc_req_finished(*request);
2568         *request = NULL;
2569
2570         goto retry_unlink;
2571 }
2572
2573 static int lmv_precleanup(struct obd_device *obd)
2574 {
2575         ENTRY;
2576         fld_client_proc_fini(&obd->u.lmv.lmv_fld);
2577         lprocfs_obd_cleanup(obd);
2578         lprocfs_free_md_stats(obd);
2579         RETURN(0);
2580 }
2581
2582 /**
2583  * Get by key a value associated with a LMV device.
2584  *
2585  * Dispatch request to lower-layer devices as needed.
2586  *
2587  * \param[in] env               execution environment for this thread
2588  * \param[in] exp               export for the LMV device
2589  * \param[in] keylen            length of key identifier
2590  * \param[in] key               identifier of key to get value for
2591  * \param[in] vallen            size of \a val
2592  * \param[out] val              pointer to storage location for value
2593  * \param[in] lsm               optional striping metadata of object
2594  *
2595  * \retval 0            on success
2596  * \retval negative     negated errno on failure
2597  */
2598 static int lmv_get_info(const struct lu_env *env, struct obd_export *exp,
2599                         __u32 keylen, void *key, __u32 *vallen, void *val)
2600 {
2601         struct obd_device       *obd;
2602         struct lmv_obd          *lmv;
2603         int                      rc = 0;
2604         ENTRY;
2605
2606         obd = class_exp2obd(exp);
2607         if (obd == NULL) {
2608                 CDEBUG(D_IOCTL, "Invalid client cookie %#llx\n",
2609                        exp->exp_handle.h_cookie);
2610                 RETURN(-EINVAL);
2611         }
2612
2613         lmv = &obd->u.lmv;
2614         if (keylen >= strlen("remote_flag") && !strcmp(key, "remote_flag")) {
2615                 int i;
2616
2617                 LASSERT(*vallen == sizeof(__u32));
2618                 for (i = 0; i < lmv->desc.ld_tgt_count; i++) {
2619                         struct lmv_tgt_desc *tgt = lmv->tgts[i];
2620                         /*
2621                          * All tgts should be connected when this gets called.
2622                          */
2623                         if (tgt == NULL || tgt->ltd_exp == NULL)
2624                                 continue;
2625
2626                         if (!obd_get_info(env, tgt->ltd_exp, keylen, key,
2627                                           vallen, val))
2628                                 RETURN(0);
2629                 }
2630                 RETURN(-EINVAL);
2631         } else if (KEY_IS(KEY_MAX_EASIZE) ||
2632                    KEY_IS(KEY_DEFAULT_EASIZE) ||
2633                    KEY_IS(KEY_CONN_DATA)) {
2634                 /*
2635                  * Forwarding this request to first MDS, it should know LOV
2636                  * desc.
2637                  */
2638                 rc = obd_get_info(env, lmv->tgts[0]->ltd_exp, keylen, key,
2639                                   vallen, val);
2640                 if (!rc && KEY_IS(KEY_CONN_DATA))
2641                         exp->exp_connect_data = *(struct obd_connect_data *)val;
2642                 RETURN(rc);
2643         } else if (KEY_IS(KEY_TGT_COUNT)) {
2644                 *((int *)val) = lmv->desc.ld_tgt_count;
2645                 RETURN(0);
2646         }
2647
2648         CDEBUG(D_IOCTL, "Invalid key\n");
2649         RETURN(-EINVAL);
2650 }
2651
2652 /**
2653  * Asynchronously set by key a value associated with a LMV device.
2654  *
2655  * Dispatch request to lower-layer devices as needed.
2656  *
2657  * \param[in] env       execution environment for this thread
2658  * \param[in] exp       export for the LMV device
2659  * \param[in] keylen    length of key identifier
2660  * \param[in] key       identifier of key to store value for
2661  * \param[in] vallen    size of value to store
2662  * \param[in] val       pointer to data to be stored
2663  * \param[in] set       optional list of related ptlrpc requests
2664  *
2665  * \retval 0            on success
2666  * \retval negative     negated errno on failure
2667  */
2668 int lmv_set_info_async(const struct lu_env *env, struct obd_export *exp,
2669                         __u32 keylen, void *key, __u32 vallen, void *val,
2670                         struct ptlrpc_request_set *set)
2671 {
2672         struct lmv_tgt_desc     *tgt = NULL;
2673         struct obd_device       *obd;
2674         struct lmv_obd          *lmv;
2675         int rc = 0;
2676         ENTRY;
2677
2678         obd = class_exp2obd(exp);
2679         if (obd == NULL) {
2680                 CDEBUG(D_IOCTL, "Invalid client cookie %#llx\n",
2681                        exp->exp_handle.h_cookie);
2682                 RETURN(-EINVAL);
2683         }
2684         lmv = &obd->u.lmv;
2685
2686         if (KEY_IS(KEY_READ_ONLY) || KEY_IS(KEY_FLUSH_CTX) ||
2687             KEY_IS(KEY_DEFAULT_EASIZE)) {
2688                 int i, err = 0;
2689
2690                 for (i = 0; i < lmv->desc.ld_tgt_count; i++) {
2691                         tgt = lmv->tgts[i];
2692
2693                         if (tgt == NULL || tgt->ltd_exp == NULL)
2694                                 continue;
2695
2696                         err = obd_set_info_async(env, tgt->ltd_exp,
2697                                                  keylen, key, vallen, val, set);
2698                         if (err && rc == 0)
2699                                 rc = err;
2700                 }
2701
2702                 RETURN(rc);
2703         }
2704
2705         RETURN(-EINVAL);
2706 }
2707
2708 static int lmv_unpack_md_v1(struct obd_export *exp, struct lmv_stripe_md *lsm,
2709                             const struct lmv_mds_md_v1 *lmm1)
2710 {
2711         struct lmv_obd  *lmv = &exp->exp_obd->u.lmv;
2712         int             stripe_count;
2713         int             cplen;
2714         int             i;
2715         int             rc = 0;
2716         ENTRY;
2717
2718         lsm->lsm_md_magic = le32_to_cpu(lmm1->lmv_magic);
2719         lsm->lsm_md_stripe_count = le32_to_cpu(lmm1->lmv_stripe_count);
2720         lsm->lsm_md_master_mdt_index = le32_to_cpu(lmm1->lmv_master_mdt_index);
2721         if (OBD_FAIL_CHECK(OBD_FAIL_UNKNOWN_LMV_STRIPE))
2722                 lsm->lsm_md_hash_type = LMV_HASH_TYPE_UNKNOWN;
2723         else
2724                 lsm->lsm_md_hash_type = le32_to_cpu(lmm1->lmv_hash_type);
2725         lsm->lsm_md_layout_version = le32_to_cpu(lmm1->lmv_layout_version);
2726         cplen = strlcpy(lsm->lsm_md_pool_name, lmm1->lmv_pool_name,
2727                         sizeof(lsm->lsm_md_pool_name));
2728
2729         if (cplen >= sizeof(lsm->lsm_md_pool_name))
2730                 RETURN(-E2BIG);
2731
2732         CDEBUG(D_INFO, "unpack lsm count %d, master %d hash_type %d"
2733                "layout_version %d\n", lsm->lsm_md_stripe_count,
2734                lsm->lsm_md_master_mdt_index, lsm->lsm_md_hash_type,
2735                lsm->lsm_md_layout_version);
2736
2737         stripe_count = le32_to_cpu(lmm1->lmv_stripe_count);
2738         for (i = 0; i < stripe_count; i++) {
2739                 fid_le_to_cpu(&lsm->lsm_md_oinfo[i].lmo_fid,
2740                               &lmm1->lmv_stripe_fids[i]);
2741                 rc = lmv_fld_lookup(lmv, &lsm->lsm_md_oinfo[i].lmo_fid,
2742                                     &lsm->lsm_md_oinfo[i].lmo_mds);
2743                 if (rc != 0)
2744                         RETURN(rc);
2745                 CDEBUG(D_INFO, "unpack fid #%d "DFID"\n", i,
2746                        PFID(&lsm->lsm_md_oinfo[i].lmo_fid));
2747         }
2748
2749         RETURN(rc);
2750 }
2751
2752 static int lmv_unpackmd(struct obd_export *exp, struct lmv_stripe_md **lsmp,
2753                         const union lmv_mds_md *lmm, size_t lmm_size)
2754 {
2755         struct lmv_stripe_md     *lsm;
2756         int                      lsm_size;
2757         int                      rc;
2758         bool                     allocated = false;
2759         ENTRY;
2760
2761         LASSERT(lsmp != NULL);
2762
2763         lsm = *lsmp;
2764         /* Free memmd */
2765         if (lsm != NULL && lmm == NULL) {
2766                 int i;
2767                 for (i = 0; i < lsm->lsm_md_stripe_count; i++) {
2768                         /* For migrating inode, the master stripe and master
2769                          * object will be the same, so do not need iput, see
2770                          * ll_update_lsm_md */
2771                         if (!(lsm->lsm_md_hash_type & LMV_HASH_FLAG_MIGRATION &&
2772                               i == 0) && lsm->lsm_md_oinfo[i].lmo_root != NULL)
2773                                 iput(lsm->lsm_md_oinfo[i].lmo_root);
2774                 }
2775                 lsm_size = lmv_stripe_md_size(lsm->lsm_md_stripe_count);
2776                 OBD_FREE(lsm, lsm_size);
2777                 *lsmp = NULL;
2778                 RETURN(0);
2779         }
2780
2781         if (le32_to_cpu(lmm->lmv_magic) == LMV_MAGIC_STRIPE)
2782                 RETURN(-EPERM);
2783
2784         /* Unpack memmd */
2785         if (le32_to_cpu(lmm->lmv_magic) != LMV_MAGIC_V1 &&
2786             le32_to_cpu(lmm->lmv_magic) != LMV_USER_MAGIC) {
2787                 CERROR("%s: invalid lmv magic %x: rc = %d\n",
2788                        exp->exp_obd->obd_name, le32_to_cpu(lmm->lmv_magic),
2789                        -EIO);
2790                 RETURN(-EIO);
2791         }
2792
2793         if (le32_to_cpu(lmm->lmv_magic) == LMV_MAGIC_V1)
2794                 lsm_size = lmv_stripe_md_size(lmv_mds_md_stripe_count_get(lmm));
2795         else
2796                 /**
2797                  * Unpack default dirstripe(lmv_user_md) to lmv_stripe_md,
2798                  * stripecount should be 0 then.
2799                  */
2800                 lsm_size = lmv_stripe_md_size(0);
2801
2802         lsm_size = lmv_stripe_md_size(lmv_mds_md_stripe_count_get(lmm));
2803         if (lsm == NULL) {
2804                 OBD_ALLOC(lsm, lsm_size);
2805                 if (lsm == NULL)
2806                         RETURN(-ENOMEM);
2807                 allocated = true;
2808                 *lsmp = lsm;
2809         }
2810
2811         switch (le32_to_cpu(lmm->lmv_magic)) {
2812         case LMV_MAGIC_V1:
2813                 rc = lmv_unpack_md_v1(exp, lsm, &lmm->lmv_md_v1);
2814                 break;
2815         default:
2816                 CERROR("%s: unrecognized magic %x\n", exp->exp_obd->obd_name,
2817                        le32_to_cpu(lmm->lmv_magic));
2818                 rc = -EINVAL;
2819                 break;
2820         }
2821
2822         if (rc != 0 && allocated) {
2823                 OBD_FREE(lsm, lsm_size);
2824                 *lsmp = NULL;
2825                 lsm_size = rc;
2826         }
2827         RETURN(lsm_size);
2828 }
2829
2830 void lmv_free_memmd(struct lmv_stripe_md *lsm)
2831 {
2832         lmv_unpackmd(NULL, &lsm, NULL, 0);
2833 }
2834 EXPORT_SYMBOL(lmv_free_memmd);
2835
2836 static int lmv_cancel_unused(struct obd_export *exp, const struct lu_fid *fid,
2837                              union ldlm_policy_data *policy,
2838                              enum ldlm_mode mode, enum ldlm_cancel_flags flags,
2839                              void *opaque)
2840 {
2841         struct lmv_obd *lmv = &exp->exp_obd->u.lmv;
2842         int rc = 0;
2843         __u32 i;
2844         ENTRY;
2845
2846         LASSERT(fid != NULL);
2847
2848         for (i = 0; i < lmv->desc.ld_tgt_count; i++) {
2849                 struct lmv_tgt_desc *tgt = lmv->tgts[i];
2850                 int err;
2851
2852                 if (tgt == NULL || tgt->ltd_exp == NULL || !tgt->ltd_active)
2853                         continue;
2854
2855                 err = md_cancel_unused(tgt->ltd_exp, fid, policy, mode, flags,
2856                                        opaque);
2857                 if (!rc)
2858                         rc = err;
2859         }
2860         RETURN(rc);
2861 }
2862
2863 static int lmv_set_lock_data(struct obd_export *exp,
2864                              const struct lustre_handle *lockh,
2865                              void *data, __u64 *bits)
2866 {
2867         struct lmv_obd          *lmv = &exp->exp_obd->u.lmv;
2868         struct lmv_tgt_desc     *tgt = lmv->tgts[0];
2869         int                      rc;
2870         ENTRY;
2871
2872         if (tgt == NULL || tgt->ltd_exp == NULL)
2873                 RETURN(-EINVAL);
2874         rc =  md_set_lock_data(tgt->ltd_exp, lockh, data, bits);
2875         RETURN(rc);
2876 }
2877
2878 enum ldlm_mode lmv_lock_match(struct obd_export *exp, __u64 flags,
2879                               const struct lu_fid *fid, enum ldlm_type type,
2880                               union ldlm_policy_data *policy,
2881                               enum ldlm_mode mode, struct lustre_handle *lockh)
2882 {
2883         struct obd_device       *obd = exp->exp_obd;
2884         struct lmv_obd          *lmv = &obd->u.lmv;
2885         enum ldlm_mode          rc;
2886         int                     tgt;
2887         int                     i;
2888         ENTRY;
2889
2890         CDEBUG(D_INODE, "Lock match for "DFID"\n", PFID(fid));
2891
2892         /*
2893          * With DNE every object can have two locks in different namespaces:
2894          * lookup lock in space of MDT storing direntry and update/open lock in
2895          * space of MDT storing inode.  Try the MDT that the FID maps to first,
2896          * since this can be easily found, and only try others if that fails.
2897          */
2898         for (i = 0, tgt = lmv_find_target_index(lmv, fid);
2899              i < lmv->desc.ld_tgt_count;
2900              i++, tgt = (tgt + 1) % lmv->desc.ld_tgt_count) {
2901                 if (tgt < 0) {
2902                         CDEBUG(D_HA, "%s: "DFID" is inaccessible: rc = %d\n",
2903                                obd->obd_name, PFID(fid), tgt);
2904                         tgt = 0;
2905                 }
2906
2907                 if (lmv->tgts[tgt] == NULL ||
2908                     lmv->tgts[tgt]->ltd_exp == NULL ||
2909                     lmv->tgts[tgt]->ltd_active == 0)
2910                         continue;
2911
2912                 rc = md_lock_match(lmv->tgts[tgt]->ltd_exp, flags, fid,
2913                                    type, policy, mode, lockh);
2914                 if (rc)
2915                         RETURN(rc);
2916         }
2917
2918         RETURN(0);
2919 }
2920
2921 int lmv_get_lustre_md(struct obd_export *exp, struct ptlrpc_request *req,
2922                       struct obd_export *dt_exp, struct obd_export *md_exp,
2923                       struct lustre_md *md)
2924 {
2925         struct lmv_obd          *lmv = &exp->exp_obd->u.lmv;
2926         struct lmv_tgt_desc     *tgt = lmv->tgts[0];
2927
2928         if (tgt == NULL || tgt->ltd_exp == NULL)
2929                 RETURN(-EINVAL);
2930
2931         return md_get_lustre_md(lmv->tgts[0]->ltd_exp, req, dt_exp, md_exp, md);
2932 }
2933
2934 int lmv_free_lustre_md(struct obd_export *exp, struct lustre_md *md)
2935 {
2936         struct obd_device       *obd = exp->exp_obd;
2937         struct lmv_obd          *lmv = &obd->u.lmv;
2938         struct lmv_tgt_desc     *tgt = lmv->tgts[0];
2939         ENTRY;
2940
2941         if (md->lmv != NULL) {
2942                 lmv_free_memmd(md->lmv);
2943                 md->lmv = NULL;
2944         }
2945         if (tgt == NULL || tgt->ltd_exp == NULL)
2946                 RETURN(-EINVAL);
2947         RETURN(md_free_lustre_md(lmv->tgts[0]->ltd_exp, md));
2948 }
2949
2950 int lmv_set_open_replay_data(struct obd_export *exp,
2951                              struct obd_client_handle *och,
2952                              struct lookup_intent *it)
2953 {
2954         struct obd_device       *obd = exp->exp_obd;
2955         struct lmv_obd          *lmv = &obd->u.lmv;
2956         struct lmv_tgt_desc     *tgt;
2957         ENTRY;
2958
2959         tgt = lmv_find_target(lmv, &och->och_fid);
2960         if (IS_ERR(tgt))
2961                 RETURN(PTR_ERR(tgt));
2962
2963         RETURN(md_set_open_replay_data(tgt->ltd_exp, och, it));
2964 }
2965
2966 int lmv_clear_open_replay_data(struct obd_export *exp,
2967                                struct obd_client_handle *och)
2968 {
2969         struct obd_device       *obd = exp->exp_obd;
2970         struct lmv_obd          *lmv = &obd->u.lmv;
2971         struct lmv_tgt_desc     *tgt;
2972         ENTRY;
2973
2974         tgt = lmv_find_target(lmv, &och->och_fid);
2975         if (IS_ERR(tgt))
2976                 RETURN(PTR_ERR(tgt));
2977
2978         RETURN(md_clear_open_replay_data(tgt->ltd_exp, och));
2979 }
2980
2981 int lmv_intent_getattr_async(struct obd_export *exp,
2982                              struct md_enqueue_info *minfo)
2983 {
2984         struct md_op_data       *op_data = &minfo->mi_data;
2985         struct obd_device       *obd = exp->exp_obd;
2986         struct lmv_obd          *lmv = &obd->u.lmv;
2987         struct lmv_tgt_desc     *ptgt = NULL;
2988         struct lmv_tgt_desc     *ctgt = NULL;
2989         int                      rc;
2990         ENTRY;
2991
2992         if (!fid_is_sane(&op_data->op_fid2))
2993                 RETURN(-EINVAL);
2994
2995         ptgt = lmv_locate_mds(lmv, op_data, &op_data->op_fid1);
2996         if (IS_ERR(ptgt))
2997                 RETURN(PTR_ERR(ptgt));
2998
2999         ctgt = lmv_locate_mds(lmv, op_data, &op_data->op_fid2);
3000         if (IS_ERR(ctgt))
3001                 RETURN(PTR_ERR(ctgt));
3002
3003         /*
3004          * if child is on remote MDT, we need 2 async RPCs to fetch both LOOKUP
3005          * lock on parent, and UPDATE lock on child MDT, which makes all
3006          * complicated. Considering remote dir is rare case, and not supporting
3007          * it in statahead won't cause any issue, drop its support for now.
3008          */
3009         if (ptgt != ctgt)
3010                 RETURN(-ENOTSUPP);
3011
3012         rc = md_intent_getattr_async(ptgt->ltd_exp, minfo);
3013         RETURN(rc);
3014 }
3015
3016 int lmv_revalidate_lock(struct obd_export *exp, struct lookup_intent *it,
3017                         struct lu_fid *fid, __u64 *bits)
3018 {
3019         struct obd_device       *obd = exp->exp_obd;
3020         struct lmv_obd          *lmv = &obd->u.lmv;
3021         struct lmv_tgt_desc     *tgt;
3022         int                      rc;
3023         ENTRY;
3024
3025         tgt = lmv_find_target(lmv, fid);
3026         if (IS_ERR(tgt))
3027                 RETURN(PTR_ERR(tgt));
3028
3029         rc = md_revalidate_lock(tgt->ltd_exp, it, fid, bits);
3030         RETURN(rc);
3031 }
3032
3033 int lmv_get_fid_from_lsm(struct obd_export *exp,
3034                          const struct lmv_stripe_md *lsm,
3035                          const char *name, int namelen, struct lu_fid *fid)
3036 {
3037         const struct lmv_oinfo *oinfo;
3038
3039         LASSERT(lsm != NULL);
3040         oinfo = lsm_name_to_stripe_info(lsm, name, namelen);
3041         if (IS_ERR(oinfo))
3042                 return PTR_ERR(oinfo);
3043
3044         *fid = oinfo->lmo_fid;
3045
3046         RETURN(0);
3047 }
3048
3049 /**
3050  * For lmv, only need to send request to master MDT, and the master MDT will
3051  * process with other slave MDTs. The only exception is Q_GETOQUOTA for which
3052  * we directly fetch data from the slave MDTs.
3053  */
3054 int lmv_quotactl(struct obd_device *unused, struct obd_export *exp,
3055                  struct obd_quotactl *oqctl)
3056 {
3057         struct obd_device   *obd = class_exp2obd(exp);
3058         struct lmv_obd      *lmv = &obd->u.lmv;
3059         struct lmv_tgt_desc *tgt = lmv->tgts[0];
3060         int                  rc = 0;
3061         __u32                i;
3062         __u64                curspace, curinodes;
3063         ENTRY;
3064
3065         if (tgt == NULL ||
3066             tgt->ltd_exp == NULL ||
3067             !tgt->ltd_active ||
3068             lmv->desc.ld_tgt_count == 0) {
3069                 CERROR("master lmv inactive\n");
3070                 RETURN(-EIO);
3071         }
3072
3073         if (oqctl->qc_cmd != Q_GETOQUOTA) {
3074                 rc = obd_quotactl(tgt->ltd_exp, oqctl);
3075                 RETURN(rc);
3076         }
3077
3078         curspace = curinodes = 0;
3079         for (i = 0; i < lmv->desc.ld_tgt_count; i++) {
3080                 int err;
3081                 tgt = lmv->tgts[i];
3082
3083                 if (tgt == NULL || tgt->ltd_exp == NULL || !tgt->ltd_active)
3084                         continue;
3085
3086                 err = obd_quotactl(tgt->ltd_exp, oqctl);
3087                 if (err) {
3088                         CERROR("getquota on mdt %d failed. %d\n", i, err);
3089                         if (!rc)
3090                                 rc = err;
3091                 } else {
3092                         curspace += oqctl->qc_dqblk.dqb_curspace;
3093                         curinodes += oqctl->qc_dqblk.dqb_curinodes;
3094                 }
3095         }
3096         oqctl->qc_dqblk.dqb_curspace = curspace;
3097         oqctl->qc_dqblk.dqb_curinodes = curinodes;
3098
3099         RETURN(rc);
3100 }
3101
3102 static int lmv_merge_attr(struct obd_export *exp,
3103                           const struct lmv_stripe_md *lsm,
3104                           struct cl_attr *attr,
3105                           ldlm_blocking_callback cb_blocking)
3106 {
3107         int rc;
3108         int i;
3109
3110         rc = lmv_revalidate_slaves(exp, lsm, cb_blocking, 0);
3111         if (rc < 0)
3112                 return rc;
3113
3114         for (i = 0; i < lsm->lsm_md_stripe_count; i++) {
3115                 struct inode *inode = lsm->lsm_md_oinfo[i].lmo_root;
3116
3117                 CDEBUG(D_INFO, ""DFID" size %llu, blocks %llu nlink %u,"
3118                        " atime %lu ctime %lu, mtime %lu.\n",
3119                        PFID(&lsm->lsm_md_oinfo[i].lmo_fid),
3120                        i_size_read(inode), (unsigned long long)inode->i_blocks,
3121                        inode->i_nlink, LTIME_S(inode->i_atime),
3122                        LTIME_S(inode->i_ctime), LTIME_S(inode->i_mtime));
3123
3124                 /* for slave stripe, it needs to subtract nlink for . and .. */
3125                 if (i != 0)
3126                         attr->cat_nlink += inode->i_nlink - 2;
3127                 else
3128                         attr->cat_nlink = inode->i_nlink;
3129
3130                 attr->cat_size += i_size_read(inode);
3131                 attr->cat_blocks += inode->i_blocks;
3132
3133                 if (attr->cat_atime < LTIME_S(inode->i_atime))
3134                         attr->cat_atime = LTIME_S(inode->i_atime);
3135
3136                 if (attr->cat_ctime < LTIME_S(inode->i_ctime))
3137                         attr->cat_ctime = LTIME_S(inode->i_ctime);
3138
3139                 if (attr->cat_mtime < LTIME_S(inode->i_mtime))
3140                         attr->cat_mtime = LTIME_S(inode->i_mtime);
3141         }
3142         return 0;
3143 }
3144
3145 struct obd_ops lmv_obd_ops = {
3146         .o_owner                = THIS_MODULE,
3147         .o_setup                = lmv_setup,
3148         .o_cleanup              = lmv_cleanup,
3149         .o_precleanup           = lmv_precleanup,
3150         .o_process_config       = lmv_process_config,
3151         .o_connect              = lmv_connect,
3152         .o_disconnect           = lmv_disconnect,
3153         .o_statfs               = lmv_statfs,
3154         .o_get_info             = lmv_get_info,
3155         .o_set_info_async       = lmv_set_info_async,
3156         .o_notify               = lmv_notify,
3157         .o_get_uuid             = lmv_get_uuid,
3158         .o_iocontrol            = lmv_iocontrol,
3159         .o_quotactl             = lmv_quotactl
3160 };
3161
3162 struct md_ops lmv_md_ops = {
3163         .m_get_root             = lmv_get_root,
3164         .m_null_inode           = lmv_null_inode,
3165         .m_close                = lmv_close,
3166         .m_create               = lmv_create,
3167         .m_enqueue              = lmv_enqueue,
3168         .m_getattr              = lmv_getattr,
3169         .m_getxattr             = lmv_getxattr,
3170         .m_getattr_name         = lmv_getattr_name,
3171         .m_intent_lock          = lmv_intent_lock,
3172         .m_link                 = lmv_link,
3173         .m_rename               = lmv_rename,
3174         .m_setattr              = lmv_setattr,
3175         .m_setxattr             = lmv_setxattr,
3176         .m_fsync                = lmv_fsync,
3177         .m_read_page            = lmv_read_page,
3178         .m_unlink               = lmv_unlink,
3179         .m_init_ea_size         = lmv_init_ea_size,
3180         .m_cancel_unused        = lmv_cancel_unused,
3181         .m_set_lock_data        = lmv_set_lock_data,
3182         .m_lock_match           = lmv_lock_match,
3183         .m_get_lustre_md        = lmv_get_lustre_md,
3184         .m_free_lustre_md       = lmv_free_lustre_md,
3185         .m_merge_attr           = lmv_merge_attr,
3186         .m_set_open_replay_data = lmv_set_open_replay_data,
3187         .m_clear_open_replay_data = lmv_clear_open_replay_data,
3188         .m_intent_getattr_async = lmv_intent_getattr_async,
3189         .m_revalidate_lock      = lmv_revalidate_lock,
3190         .m_get_fid_from_lsm     = lmv_get_fid_from_lsm,
3191         .m_unpackmd             = lmv_unpackmd,
3192 };
3193
3194 static int __init lmv_init(void)
3195 {
3196         return class_register_type(&lmv_obd_ops, &lmv_md_ops, true, NULL,
3197                                    LUSTRE_LMV_NAME, NULL);
3198 }
3199
3200 static void __exit lmv_exit(void)
3201 {
3202         class_unregister_type(LUSTRE_LMV_NAME);
3203 }
3204
3205 MODULE_AUTHOR("OpenSFS, Inc. <http://www.lustre.org/>");
3206 MODULE_DESCRIPTION("Lustre Logical Metadata Volume");
3207 MODULE_VERSION(LUSTRE_VERSION_STRING);
3208 MODULE_LICENSE("GPL");
3209
3210 module_init(lmv_init);
3211 module_exit(lmv_exit);