Whamcloud - gitweb
Holding lprocfs lock with send rpc can produce block for destroy
[fs/lustre-release.git] / lustre / lmv / lmv_obd.c
1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2  * vim:expandtab:shiftwidth=8:tabstop=8:
3  *
4  * Copyright (C) 2002, 2003, 2004, 2005, 2006 Cluster File Systems, Inc.
5  *
6  *   This file is part of Lustre, http://www.lustre.org.
7  *
8  *   Lustre is free software; you can redistribute it and/or
9  *   modify it under the terms of version 2 of the GNU General Public
10  *   License as published by the Free Software Foundation.
11  *
12  *   Lustre is distributed in the hope that it will be useful,
13  *   but WITHOUT ANY WARRANTY; without even the implied warranty of
14  *   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
15  *   GNU General Public License for more details.
16  *
17  *   You should have received a copy of the GNU General Public License
18  *   along with Lustre; if not, write to the Free Software
19  *   Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA.
20  */
21
22 #ifndef EXPORT_SYMTAB
23 # define EXPORT_SYMTAB
24 #endif
25 #define DEBUG_SUBSYSTEM S_LMV
26 #ifdef __KERNEL__
27 #include <linux/slab.h>
28 #include <linux/module.h>
29 #include <linux/init.h>
30 #include <linux/slab.h>
31 #include <linux/pagemap.h>
32 #include <linux/mm.h>
33 #include <asm/div64.h>
34 #include <linux/seq_file.h>
35 #include <linux/namei.h>
36 #else
37 #include <liblustre.h>
38 #endif
39
40 #include <linux/ext2_fs.h>
41
42 #include <lustre/lustre_idl.h>
43 #include <lustre_log.h>
44 #include <obd_support.h>
45 #include <lustre_lib.h>
46 #include <lustre_net.h>
47 #include <obd_class.h>
48 #include <lprocfs_status.h>
49 #include <lustre_lite.h>
50 #include <lustre_fid.h>
51 #include "lmv_internal.h"
52
53 /* not defined for liblustre building */
54 #if !defined(ATOMIC_INIT)
55 #define ATOMIC_INIT(val) { (val) }
56 #endif
57
58 /* object cache. */
59 cfs_mem_cache_t *obj_cache;
60 atomic_t obj_cache_count = ATOMIC_INIT(0);
61
62 static void lmv_activate_target(struct lmv_obd *lmv,
63                                 struct lmv_tgt_desc *tgt,
64                                 int activate)
65 {
66         if (tgt->ltd_active == activate)
67                 return;
68
69         tgt->ltd_active = activate;
70         lmv->desc.ld_active_tgt_count += (activate ? 1 : -1);
71 }
72
73 /* Error codes:
74  *
75  *  -EINVAL  : UUID can't be found in the LMV's target list
76  *  -ENOTCONN: The UUID is found, but the target connection is bad (!)
77  *  -EBADF   : The UUID is found, but the OBD of the wrong type (!)
78  */
79 static int lmv_set_mdc_active(struct lmv_obd *lmv, struct obd_uuid *uuid,
80                               int activate)
81 {
82         struct lmv_tgt_desc *tgt;
83         struct obd_device *obd;
84         int i, rc = 0;
85         ENTRY;
86
87         CDEBUG(D_INFO, "Searching in lmv %p for uuid %s (activate=%d)\n",
88                lmv, uuid->uuid, activate);
89
90         spin_lock(&lmv->lmv_lock);
91         for (i = 0, tgt = lmv->tgts; i < lmv->desc.ld_tgt_count; i++, tgt++) {
92                 if (tgt->ltd_exp == NULL)
93                         continue;
94
95                 CDEBUG(D_INFO, "lmv idx %d is %s conn "LPX64"\n",
96                        i, tgt->ltd_uuid.uuid, tgt->ltd_exp->exp_handle.h_cookie);
97
98                 if (obd_uuid_equals(uuid, &tgt->ltd_uuid))
99                         break;
100         }
101
102         if (i == lmv->desc.ld_tgt_count)
103                 GOTO(out_lmv_lock, rc = -EINVAL);
104
105         obd = class_exp2obd(tgt->ltd_exp);
106         if (obd == NULL)
107                 GOTO(out_lmv_lock, rc = -ENOTCONN);
108
109         CDEBUG(D_INFO, "Found OBD %s=%s device %d (%p) type %s at LMV idx %d\n",
110                obd->obd_name, obd->obd_uuid.uuid, obd->obd_minor, obd,
111                obd->obd_type->typ_name, i);
112         LASSERT(strcmp(obd->obd_type->typ_name, LUSTRE_MDC_NAME) == 0);
113
114         if (tgt->ltd_active == activate) {
115                 CDEBUG(D_INFO, "OBD %p already %sactive!\n", obd,
116                        activate ? "" : "in");
117                 GOTO(out_lmv_lock, rc);
118         }
119
120         CDEBUG(D_INFO, "Marking OBD %p %sactive\n",
121                obd, activate ? "" : "in");
122
123         lmv_activate_target(lmv, tgt, activate);
124
125         EXIT;
126
127  out_lmv_lock:
128         spin_unlock(&lmv->lmv_lock);
129         return rc;
130 }
131
132 static int lmv_set_mdc_data(struct lmv_obd *lmv, struct obd_uuid *uuid,
133                             struct obd_connect_data *data)
134 {
135         struct lmv_tgt_desc *tgt;
136         int i;
137         ENTRY;
138
139         LASSERT(data != NULL);
140
141         spin_lock(&lmv->lmv_lock);
142         for (i = 0, tgt = lmv->tgts; i < lmv->desc.ld_tgt_count; i++, tgt++) {
143                 if (tgt->ltd_exp == NULL)
144                         continue;
145
146                 if (obd_uuid_equals(uuid, &tgt->ltd_uuid)) {
147                         lmv->datas[tgt->ltd_idx] = *data;
148                         break;
149                 }
150         }
151         spin_unlock(&lmv->lmv_lock);
152         RETURN(0);
153 }
154
155 static int lmv_notify(struct obd_device *obd, struct obd_device *watched,
156                       enum obd_notify_event ev, void *data)
157 {
158         struct lmv_obd *lmv = &obd->u.lmv;
159         struct obd_uuid *uuid;
160         int rc = 0;
161         ENTRY;
162
163         if (strcmp(watched->obd_type->typ_name, LUSTRE_MDC_NAME)) {
164                 CERROR("unexpected notification of %s %s!\n",
165                        watched->obd_type->typ_name,
166                        watched->obd_name);
167                 RETURN(-EINVAL);
168         }
169
170         uuid = &watched->u.cli.cl_target_uuid;
171         if (ev == OBD_NOTIFY_ACTIVE || ev == OBD_NOTIFY_INACTIVE) {
172                 /*
173                  * Set MDC as active before notifying the observer, so the
174                  * observer can use the MDC normally.
175                  */
176                 rc = lmv_set_mdc_active(lmv, uuid,
177                                         ev == OBD_NOTIFY_ACTIVE);
178                 if (rc) {
179                         CERROR("%sactivation of %s failed: %d\n",
180                                ev == OBD_NOTIFY_ACTIVE ? "" : "de",
181                                uuid->uuid, rc);
182                         RETURN(rc);
183                 }
184         } else if (ev == OBD_NOTIFY_OCD) {
185                 struct obd_connect_data *conn_data =
186                         &watched->u.cli.cl_import->imp_connect_data;
187
188                 /* Set connect data to desired target, update
189                  * exp_connect_flags. */
190                 rc = lmv_set_mdc_data(lmv, uuid, conn_data);
191                 if (rc) {
192                         CERROR("can't set connect data to target %s, rc %d\n",
193                                uuid->uuid, rc);
194                         RETURN(rc);
195                 }
196
197                 /*
198                  * XXX: Make sure that ocd_connect_flags from all targets are
199                  * the same. Otherwise one of MDTs runs wrong version or
200                  * something like this.  --umka
201                  */
202                 obd->obd_self_export->exp_connect_flags =
203                         conn_data->ocd_connect_flags;
204         }
205 #if 0
206         else if (ev == OBD_NOTIFY_DISCON) {
207                 /* For disconnect event, flush fld cache for failout MDS case. */
208                 fld_client_flush(&lmv->lmv_fld);
209         }
210 #endif
211         /* Pass the notification up the chain. */
212         if (obd->obd_observer)
213                 rc = obd_notify(obd->obd_observer, watched, ev, data);
214
215         RETURN(rc);
216 }
217
218 /* this is fake connect function. Its purpose is to initialize lmv and say
219  * caller that everything is okay. Real connection will be performed later. */
220 static int lmv_connect(const struct lu_env *env,
221                        struct lustre_handle *conn, struct obd_device *obd,
222                        struct obd_uuid *cluuid, struct obd_connect_data *data)
223 {
224 #ifdef __KERNEL__
225         struct proc_dir_entry *lmv_proc_dir;
226 #endif
227         struct lmv_obd *lmv = &obd->u.lmv;
228         struct obd_export *exp;
229         int rc = 0;
230         ENTRY;
231
232         rc = class_connect(conn, obd, cluuid);
233         if (rc) {
234                 CERROR("class_connection() returned %d\n", rc);
235                 RETURN(rc);
236         }
237
238         exp = class_conn2export(conn);
239
240         /* we don't want to actually do the underlying connections more than
241          * once, so keep track. */
242         lmv->refcount++;
243         if (lmv->refcount > 1) {
244                 class_export_put(exp);
245                 RETURN(0);
246         }
247
248         lmv->exp = exp;
249         lmv->connected = 0;
250         lmv->cluuid = *cluuid;
251
252         if (data)
253                 lmv->conn_data = *data;
254
255 #ifdef __KERNEL__
256         lmv_proc_dir = lprocfs_register("target_obds", obd->obd_proc_entry,
257                                         NULL, NULL);
258         if (IS_ERR(lmv_proc_dir)) {
259                 CERROR("could not register /proc/fs/lustre/%s/%s/target_obds.",
260                        obd->obd_type->typ_name, obd->obd_name);
261                 lmv_proc_dir = NULL;
262         }
263 #endif
264
265         /* all real clients should perform actual connection right away, because
266          * it is possible, that LMV will not have opportunity to connect targets
267          * and MDC stuff will be called directly, for instance while reading
268          * ../mdc/../kbytesfree procfs file, etc. */
269         if (data->ocd_connect_flags & OBD_CONNECT_REAL)
270                 rc = lmv_check_connect(obd);
271
272 #ifdef __KERNEL__
273         if (rc) {
274                 if (lmv_proc_dir)
275                         lprocfs_remove(&lmv_proc_dir);
276         }
277 #endif
278
279         RETURN(rc);
280 }
281
282 static void lmv_set_timeouts(struct obd_device *obd)
283 {
284         struct lmv_tgt_desc *tgts;
285         struct lmv_obd *lmv;
286         int i;
287
288         lmv = &obd->u.lmv;
289         if (lmv->server_timeout == 0)
290                 return;
291
292         if (lmv->connected == 0)
293                 return;
294
295         for (i = 0, tgts = lmv->tgts; i < lmv->desc.ld_tgt_count; i++, tgts++) {
296                 if (tgts->ltd_exp == NULL)
297                         continue;
298
299                 obd_set_info_async(tgts->ltd_exp, strlen("inter_mds"),
300                                    "inter_mds", 0, NULL, NULL);
301         }
302 }
303
304 static int lmv_init_ea_size(struct obd_export *exp, int easize,
305                             int def_easize, int cookiesize)
306 {
307         struct obd_device *obd = exp->exp_obd;
308         struct lmv_obd *lmv = &obd->u.lmv;
309         int i, rc = 0, change = 0;
310         ENTRY;
311
312         if (lmv->max_easize < easize) {
313                 lmv->max_easize = easize;
314                 change = 1;
315         }
316         if (lmv->max_def_easize < def_easize) {
317                 lmv->max_def_easize = def_easize;
318                 change = 1;
319         }
320         if (lmv->max_cookiesize < cookiesize) {
321                 lmv->max_cookiesize = cookiesize;
322                 change = 1;
323         }
324         if (change == 0)
325                 RETURN(0);
326
327         if (lmv->connected == 0)
328                 RETURN(0);
329
330         for (i = 0; i < lmv->desc.ld_tgt_count; i++) {
331                 if (lmv->tgts[i].ltd_exp == NULL) {
332                         CWARN("%s: NULL export for %d\n", obd->obd_name, i);
333                         continue;
334                 }
335
336                 rc = md_init_ea_size(lmv->tgts[i].ltd_exp, easize, def_easize,
337                                      cookiesize);
338                 if (rc) {
339                         CERROR("obd_init_ea_size() failed on MDT target %d, "
340                                "error %d.\n", i, rc);
341                         break;
342                 }
343         }
344         RETURN(rc);
345 }
346
347 #define MAX_STRING_SIZE 128
348
349 int lmv_connect_mdc(struct obd_device *obd, struct lmv_tgt_desc *tgt)
350 {
351         struct lmv_obd *lmv = &obd->u.lmv;
352         struct obd_uuid *cluuid = &lmv->cluuid;
353         struct obd_connect_data *mdc_data = NULL;
354         struct obd_uuid lmv_mdc_uuid = { "LMV_MDC_UUID" };
355         struct lustre_handle conn = {0, };
356         struct obd_device *mdc_obd;
357         struct obd_export *mdc_exp;
358         struct lu_fld_target target;
359         int rc;
360 #ifdef __KERNEL__
361         struct proc_dir_entry *lmv_proc_dir;
362 #endif
363         ENTRY;
364
365         /* for MDS: don't connect to yourself */
366         if (obd_uuid_equals(&tgt->ltd_uuid, cluuid)) {
367                 CDEBUG(D_CONFIG, "don't connect back to %s\n", cluuid->uuid);
368                 /* XXX - the old code didn't increment active tgt count.
369                  *       should we ? */
370                 RETURN(0);
371         }
372
373         mdc_obd = class_find_client_obd(&tgt->ltd_uuid, LUSTRE_MDC_NAME,
374                                         &obd->obd_uuid);
375         if (!mdc_obd) {
376                 CERROR("target %s not attached\n", tgt->ltd_uuid.uuid);
377                 RETURN(-EINVAL);
378         }
379
380         CDEBUG(D_CONFIG, "connect to %s(%s) - %s, %s FOR %s\n",
381                 mdc_obd->obd_name, mdc_obd->obd_uuid.uuid,
382                 tgt->ltd_uuid.uuid, obd->obd_uuid.uuid,
383                 cluuid->uuid);
384
385         if (!mdc_obd->obd_set_up) {
386                 CERROR("target %s is not set up\n", tgt->ltd_uuid.uuid);
387                 RETURN(-EINVAL);
388         }
389
390         rc = obd_connect(NULL, &conn, mdc_obd, &lmv_mdc_uuid,
391                          &lmv->conn_data);
392         if (rc) {
393                 CERROR("target %s connect error %d\n", tgt->ltd_uuid.uuid, rc);
394                 RETURN(rc);
395         }
396
397         mdc_exp = class_conn2export(&conn);
398
399         /* Init fid sequence client for this mdc. */
400         rc = obd_fid_init(mdc_exp);
401         if (rc)
402                 RETURN(rc);
403
404         /* Add new FLD target. */
405         target.ft_srv = NULL;
406         target.ft_exp = mdc_exp;
407         target.ft_idx = tgt->ltd_idx;
408
409         fld_client_add_target(&lmv->lmv_fld, &target);
410
411         mdc_data = &class_exp2cliimp(mdc_exp)->imp_connect_data;
412
413         rc = obd_register_observer(mdc_obd, obd);
414         if (rc) {
415                 obd_disconnect(mdc_exp);
416                 CERROR("target %s register_observer error %d\n",
417                        tgt->ltd_uuid.uuid, rc);
418                 RETURN(rc);
419         }
420
421         if (obd->obd_observer) {
422                 /* tell the mds_lmv about the new target */
423                 rc = obd_notify(obd->obd_observer, mdc_exp->exp_obd,
424                                 OBD_NOTIFY_ACTIVE, (void *)(tgt - lmv->tgts));
425                 if (rc) {
426                         obd_disconnect(mdc_exp);
427                         RETURN(rc);
428                 }
429         }
430
431         tgt->ltd_active = 1;
432         tgt->ltd_exp = mdc_exp;
433         lmv->desc.ld_active_tgt_count++;
434
435         /* copy connect data, it may be used later */
436         lmv->datas[tgt->ltd_idx] = *mdc_data;
437
438         md_init_ea_size(tgt->ltd_exp, lmv->max_easize,
439                         lmv->max_def_easize, lmv->max_cookiesize);
440
441         CDEBUG(D_CONFIG, "connected to %s(%s) successfully (%d)\n",
442                 mdc_obd->obd_name, mdc_obd->obd_uuid.uuid,
443                 atomic_read(&obd->obd_refcount));
444
445 #ifdef __KERNEL__
446         lmv_proc_dir = lprocfs_srch(obd->obd_proc_entry, "target_obds");
447         if (lmv_proc_dir) {
448                 struct proc_dir_entry *mdc_symlink;
449                 char name[MAX_STRING_SIZE + 1];
450
451                 LASSERT(mdc_obd->obd_type != NULL);
452                 LASSERT(mdc_obd->obd_type->typ_name != NULL);
453                 name[MAX_STRING_SIZE] = '\0';
454                 snprintf(name, MAX_STRING_SIZE, "../../../%s/%s",
455                          mdc_obd->obd_type->typ_name,
456                          mdc_obd->obd_name);
457                 mdc_symlink = proc_symlink(mdc_obd->obd_name,
458                                            lmv_proc_dir, name);
459                 if (mdc_symlink == NULL) {
460                         CERROR("could not register LMV target "
461                                "/proc/fs/lustre/%s/%s/target_obds/%s.",
462                                obd->obd_type->typ_name, obd->obd_name,
463                                mdc_obd->obd_name);
464                         lprocfs_remove(&lmv_proc_dir);
465                         lmv_proc_dir = NULL;
466                 }
467         }
468 #endif
469         RETURN(0);
470 }
471
472 int lmv_add_target(struct obd_device *obd, struct obd_uuid *tgt_uuid)
473 {
474         struct lmv_obd *lmv = &obd->u.lmv;
475         struct lmv_tgt_desc *tgt;
476         int rc = 0;
477         ENTRY;
478
479         CDEBUG(D_CONFIG, "tgt_uuid: %s.\n", tgt_uuid->uuid);
480
481         lmv_init_lock(lmv);
482
483         if (lmv->desc.ld_active_tgt_count >= LMV_MAX_TGT_COUNT) {
484                 lmv_init_unlock(lmv);
485                 CERROR("can't add %s, LMV module compiled for %d MDCs. "
486                        "That many MDCs already configured.\n",
487                        tgt_uuid->uuid, LMV_MAX_TGT_COUNT);
488                 RETURN(-EINVAL);
489         }
490         if (lmv->desc.ld_tgt_count == 0) {
491                 struct obd_device *mdc_obd;
492
493                 mdc_obd = class_find_client_obd(tgt_uuid, LUSTRE_MDC_NAME,
494                                                 &obd->obd_uuid);
495                 if (!mdc_obd) {
496                         lmv_init_unlock(lmv);
497                         CERROR("Target %s not attached\n", tgt_uuid->uuid);
498                         RETURN(-EINVAL);
499                 }
500
501                 rc = obd_llog_init(obd, OBD_LLOG_GROUP, mdc_obd, 0, NULL, tgt_uuid);
502                 if (rc) {
503                         lmv_init_unlock(lmv);
504                         CERROR("lmv failed to setup llogging subsystems\n");
505                 }
506         }
507         spin_lock(&lmv->lmv_lock);
508         tgt = lmv->tgts + lmv->desc.ld_tgt_count++;
509         tgt->ltd_uuid = *tgt_uuid;
510         spin_unlock(&lmv->lmv_lock);
511
512         if (lmv->connected) {
513                 rc = lmv_connect_mdc(obd, tgt);
514                 if (rc) {
515                         spin_lock(&lmv->lmv_lock);
516                         lmv->desc.ld_tgt_count--;
517                         memset(tgt, 0, sizeof(*tgt));
518                         spin_unlock(&lmv->lmv_lock);
519                 } else {
520                         int easize = sizeof(struct lmv_stripe_md) +
521                                      lmv->desc.ld_tgt_count *
522                                      sizeof(struct lu_fid);
523                         lmv_init_ea_size(obd->obd_self_export, easize, 0, 0);
524                 }
525         }
526
527         lmv_init_unlock(lmv);
528         RETURN(rc);
529 }
530
531 /* performs a check if passed obd is connected. If no - connect it. */
532 int lmv_check_connect(struct obd_device *obd)
533 {
534         struct lmv_obd *lmv = &obd->u.lmv;
535         struct lmv_tgt_desc *tgt;
536         int i, rc, easize;
537         ENTRY;
538
539         if (lmv->connected)
540                 RETURN(0);
541
542         lmv_init_lock(lmv);
543         if (lmv->connected) {
544                 lmv_init_unlock(lmv);
545                 RETURN(0);
546         }
547
548         if (lmv->desc.ld_tgt_count == 0) {
549                 CERROR("%s: no targets configured.\n", obd->obd_name);
550                 RETURN(-EINVAL);
551         }
552
553         CDEBUG(D_CONFIG, "time to connect %s to %s\n",
554                lmv->cluuid.uuid, obd->obd_name);
555
556         LASSERT(lmv->tgts != NULL);
557
558         for (i = 0, tgt = lmv->tgts; i < lmv->desc.ld_tgt_count; i++, tgt++) {
559                 rc = lmv_connect_mdc(obd, tgt);
560                 if (rc)
561                         GOTO(out_disc, rc);
562         }
563
564         lmv_set_timeouts(obd);
565         class_export_put(lmv->exp);
566         lmv->connected = 1;
567         easize = lmv_get_easize(lmv);
568         lmv_init_ea_size(obd->obd_self_export, easize, 0, 0);
569         lmv_init_unlock(lmv);
570         RETURN(0);
571
572  out_disc:
573         while (i-- > 0) {
574                 int rc2;
575                 --tgt;
576                 tgt->ltd_active = 0;
577                 if (tgt->ltd_exp) {
578                         --lmv->desc.ld_active_tgt_count;
579                         rc2 = obd_disconnect(tgt->ltd_exp);
580                         if (rc2) {
581                                 CERROR("error: LMV target %s disconnect on "
582                                        "MDC idx %d: error %d\n",
583                                        tgt->ltd_uuid.uuid, i, rc2);
584                         }
585                 }
586         }
587         class_disconnect(lmv->exp);
588         lmv_init_unlock(lmv);
589         RETURN(rc);
590 }
591
592 static int lmv_disconnect_mdc(struct obd_device *obd, struct lmv_tgt_desc *tgt)
593 {
594 #ifdef __KERNEL__
595         struct proc_dir_entry *lmv_proc_dir;
596 #endif
597         struct lmv_obd *lmv = &obd->u.lmv;
598         struct obd_device *mdc_obd;
599         int rc;
600         ENTRY;
601
602         LASSERT(tgt != NULL);
603         LASSERT(obd != NULL);
604
605         mdc_obd = class_exp2obd(tgt->ltd_exp);
606
607         if (mdc_obd)
608                 mdc_obd->obd_no_recov = obd->obd_no_recov;
609
610 #ifdef __KERNEL__
611         lmv_proc_dir = lprocfs_srch(obd->obd_proc_entry, "target_obds");
612         if (lmv_proc_dir) {
613                 struct proc_dir_entry *mdc_symlink;
614
615                 mdc_symlink = lprocfs_srch(lmv_proc_dir, mdc_obd->obd_name);
616                 if (mdc_symlink) {
617                         lprocfs_remove(&mdc_symlink);
618                 } else {
619                         CERROR("/proc/fs/lustre/%s/%s/target_obds/%s missing\n",
620                                obd->obd_type->typ_name, obd->obd_name,
621                                mdc_obd->obd_name);
622                 }
623         }
624 #endif
625         rc = obd_fid_fini(tgt->ltd_exp);
626         if (rc)
627                 CERROR("Can't finanize fids factory\n");
628
629         CDEBUG(D_OTHER, "Disconnected from %s(%s) successfully\n",
630                tgt->ltd_exp->exp_obd->obd_name,
631                tgt->ltd_exp->exp_obd->obd_uuid.uuid);
632
633         obd_register_observer(tgt->ltd_exp->exp_obd, NULL);
634         rc = obd_disconnect(tgt->ltd_exp);
635         if (rc) {
636                 if (tgt->ltd_active) {
637                         CERROR("Target %s disconnect error %d\n",
638                                tgt->ltd_uuid.uuid, rc);
639                 }
640         }
641
642         lmv_activate_target(lmv, tgt, 0);
643         tgt->ltd_exp = NULL;
644         RETURN(0);
645 }
646
647 static int lmv_disconnect(struct obd_export *exp)
648 {
649         struct obd_device *obd = class_exp2obd(exp);
650 #ifdef __KERNEL__
651         struct proc_dir_entry *lmv_proc_dir;
652 #endif
653         struct lmv_obd *lmv = &obd->u.lmv;
654         int rc, i;
655         ENTRY;
656
657         if (!lmv->tgts)
658                 goto out_local;
659
660         /* Only disconnect the underlying layers on the final disconnect. */
661         lmv->refcount--;
662         if (lmv->refcount != 0)
663                 goto out_local;
664
665         for (i = 0; i < lmv->desc.ld_tgt_count; i++) {
666                 if (lmv->tgts[i].ltd_exp == NULL)
667                         continue;
668                 lmv_disconnect_mdc(obd, &lmv->tgts[i]);
669         }
670
671 #ifdef __KERNEL__
672         lmv_proc_dir = lprocfs_srch(obd->obd_proc_entry, "target_obds");
673         if (lmv_proc_dir) {
674                 lprocfs_remove(&lmv_proc_dir);
675         } else {
676                 CERROR("/proc/fs/lustre/%s/%s/target_obds missing\n",
677                        obd->obd_type->typ_name, obd->obd_name);
678         }
679 #endif
680
681 out_local:
682         /*
683          * This is the case when no real connection is established by
684          * lmv_check_connect().
685          */
686         if (!lmv->connected)
687                 class_export_put(exp);
688         rc = class_disconnect(exp);
689         if (lmv->refcount == 0)
690                 lmv->connected = 0;
691         RETURN(rc);
692 }
693
694 static int lmv_iocontrol(unsigned int cmd, struct obd_export *exp,
695                          int len, void *karg, void *uarg)
696 {
697         struct obd_device *obddev = class_exp2obd(exp);
698         struct lmv_obd *lmv = &obddev->u.lmv;
699         int i, rc = 0, set = 0;
700         ENTRY;
701
702         if (lmv->desc.ld_tgt_count == 0)
703                 RETURN(-ENOTTY);
704
705         switch (cmd) {
706         case IOC_OBD_STATFS: {
707                 struct obd_ioctl_data *data = karg;
708                 struct obd_device *mdc_obd;
709                 struct obd_statfs stat_buf = {0};
710                 __u32 index;
711
712                 memcpy(&index, data->ioc_inlbuf2, sizeof(__u32));
713                 LASSERT(data->ioc_plen1 == sizeof(struct obd_statfs));
714
715                 if ((index >= lmv->desc.ld_tgt_count))
716                         RETURN(-ENODEV);
717
718                 if (!lmv->tgts[index].ltd_active)
719                         RETURN(-ENODATA);
720
721                 mdc_obd = class_exp2obd(lmv->tgts[index].ltd_exp);
722                 if (!mdc_obd)
723                         RETURN(-EINVAL);
724
725                 /* got statfs data */
726                 rc = obd_statfs(mdc_obd, &stat_buf,
727                                 cfs_time_current_64() - HZ, 0);
728                 if (rc)
729                         RETURN(rc);
730                 if (copy_to_user(data->ioc_pbuf1, &stat_buf, data->ioc_plen1))
731                         RETURN(rc);
732                 /* copy UUID */
733                 rc = copy_to_user(data->ioc_pbuf2, obd2cli_tgt(mdc_obd),
734                                   data->ioc_plen2);
735                 break;
736         }
737         default : {
738                 for (i = 0; i < lmv->desc.ld_tgt_count; i++) {
739                         int err;
740
741                         if (lmv->tgts[i].ltd_exp == NULL)
742                                 continue;
743
744                         err = obd_iocontrol(cmd, lmv->tgts[i].ltd_exp, len,
745                                             karg, uarg);
746                         if (err) {
747                                 if (lmv->tgts[i].ltd_active) {
748                                         CERROR("error: iocontrol MDC %s on MDT"
749                                                "idx %d cmd %x: err = %d\n",
750                                                 lmv->tgts[i].ltd_uuid.uuid,
751                                                 i, cmd, err);
752                                         if (!rc)
753                                                 rc = err;
754                                 }
755                         } else
756                                 set = 1;
757                 }
758                 if (!set && !rc)
759                         rc = -EIO;
760         }
761         }
762         RETURN(rc);
763 }
764
765 enum MDS_POLICY {
766      CHAR_TYPE,
767      NID_TYPE
768 };
769
770 static int lmv_all_chars_policy(int count, const char *name,
771                                 int len)
772 {
773         unsigned int c = 0;
774
775         while (len > 0)
776                 c += name[--len];
777         c = c % count;
778         return c;
779 }
780
781 static int lmv_nid_policy(struct lmv_obd *lmv)
782 {
783         struct obd_import *imp = class_exp2cliimp(lmv->tgts[0].ltd_exp);
784         __u32 id;
785         /*
786          * XXX Hack: to get nid we assume that underlying obd device is mdc.
787          */
788         id = imp->imp_connection->c_self ^ (imp->imp_connection->c_self >> 32);
789         return id % lmv->desc.ld_tgt_count;
790 }
791
792 static int lmv_choose_mds(struct lmv_obd *lmv, struct md_op_data *op_data,
793                           int type)
794 {
795         switch (type) {
796         case CHAR_TYPE:
797                 return lmv_all_chars_policy(lmv->desc.ld_tgt_count,
798                                             op_data->op_name,
799                                             op_data->op_namelen);
800         case NID_TYPE:
801                 return lmv_nid_policy(lmv);
802
803         default:
804                 break;
805         }
806
807         CERROR("unsupport type %d \n", type);
808         return -EINVAL;
809 }
810
811 /* This is _inode_ placement policy function (not name). */
812 static int lmv_placement_policy(struct obd_device *obd,
813                                 struct md_op_data *op_data,
814                                 mdsno_t *mds)
815 {
816         struct lmv_obd *lmv = &obd->u.lmv;
817         struct lmv_obj *obj;
818         int rc;
819         ENTRY;
820
821         LASSERT(mds != NULL);
822
823         /*
824          * Allocate new fid on target according to operation type and parent
825          * home mds.
826          */
827         obj = lmv_obj_grab(obd, &op_data->op_fid1);
828         if (obj != NULL || op_data->op_name == NULL ||
829             op_data->op_opc != LUSTRE_OPC_MKDIR) {
830                 /*
831                  * Allocate fid for non-dir or for null name or for case parent
832                  * dir is split.
833                  */
834                 if (obj) {
835                         lmv_obj_put(obj);
836
837                         /*
838                          * If we have this flag turned on, and we see that
839                          * parent dir is split, this means, that caller did not
840                          * notice split yet. This is race and we would like to
841                          * let caller know that.
842                          */
843                         if (op_data->op_bias & MDS_CHECK_SPLIT)
844                                 RETURN(-ERESTART);
845                 }
846
847                 /*
848                  * Allocate new fid on same mds where parent fid is located and
849                  * where operation will be sent. In case of split dir, ->op_fid1
850                  * and ->op_mds here will contain fid and mds of slave directory
851                  * object (assigned by caller).
852                  */
853                 *mds = op_data->op_mds;
854                 rc = 0;
855
856 #if 0
857                 /* XXX: This should be removed later wehn we sure it is not
858                  * needed. */
859                 rc = lmv_fld_lookup(lmv, &op_data->op_fid1, mds);
860                 if (rc)
861                         GOTO(out, rc);
862 #endif
863         } else {
864                 /*
865                  * Parent directory is not split and we want to create a
866                  * directory in it. Let's calculate where to place it according
867                  * to name.
868                  */
869                 *mds = lmv_choose_mds(lmv, op_data, NID_TYPE);
870                 rc = 0;
871         }
872         EXIT;
873 #if 0
874 out:
875 #endif
876         if (rc) {
877                 CERROR("Can't choose MDS, err = %d\n", rc);
878         } else {
879                 LASSERT(*mds < lmv->desc.ld_tgt_count);
880         }
881
882         return rc;
883 }
884
885 int __lmv_fid_alloc(struct lmv_obd *lmv, struct lu_fid *fid,
886                     mdsno_t mds)
887 {
888         struct lmv_tgt_desc *tgt = &lmv->tgts[mds];
889         int rc;
890         ENTRY;
891
892         /* New seq alloc and FLD setup should be atomic. */
893         down(&tgt->ltd_fid_sem);
894
895         /* Asking underlaying tgt layer to allocate new fid. */
896         rc = obd_fid_alloc(tgt->ltd_exp, fid, NULL);
897         if (rc > 0) {
898                 LASSERT(fid_is_sane(fid));
899
900                 /* Client switches to new sequence, setup FLD. */
901                 rc = fld_client_create(&lmv->lmv_fld, fid_seq(fid),
902                                        mds, NULL);
903                 if (rc) {
904                         CERROR("Can't create fld entry, rc %d\n", rc);
905                         /* Delete just allocated fid sequence */
906                         obd_fid_delete(tgt->ltd_exp, NULL);
907                 }
908         }
909         up(&tgt->ltd_fid_sem);
910         RETURN(rc);
911 }
912
913 int lmv_fid_alloc(struct obd_export *exp, struct lu_fid *fid,
914                   struct md_op_data *op_data)
915 {
916         struct obd_device *obd = class_exp2obd(exp);
917         struct lmv_obd *lmv = &obd->u.lmv;
918         mdsno_t mds;
919         int rc;
920         ENTRY;
921
922         LASSERT(op_data != NULL);
923         LASSERT(fid != NULL);
924
925         rc = lmv_placement_policy(obd, op_data, &mds);
926         if (rc) {
927                 CERROR("Can't get target for allocating fid, "
928                        "rc %d\n", rc);
929                 RETURN(rc);
930         }
931
932         rc = __lmv_fid_alloc(lmv, fid, mds);
933         if (rc) {
934                 CERROR("Can't alloc new fid, rc %d\n", rc);
935                 RETURN(rc);
936         }
937
938         RETURN(rc);
939 }
940
941 static int lmv_fid_delete(struct obd_export *exp, const struct lu_fid *fid)
942 {
943         ENTRY;
944
945         LASSERT(exp && fid);
946         if (lmv_obj_delete(exp, fid)) {
947                 CDEBUG(D_OTHER, "lmv object "DFID" is destroyed.\n",
948                        PFID(fid));
949         }
950         RETURN(0);
951 }
952
953 static int lmv_setup(struct obd_device *obd, struct lustre_cfg *lcfg)
954 {
955         struct lmv_obd *lmv = &obd->u.lmv;
956         struct lprocfs_static_vars lvars;
957         struct lmv_desc *desc;
958         int rc, i = 0;
959         ENTRY;
960
961         if (LUSTRE_CFG_BUFLEN(lcfg, 1) < 1) {
962                 CERROR("LMV setup requires a descriptor\n");
963                 RETURN(-EINVAL);
964         }
965
966         desc = (struct lmv_desc *)lustre_cfg_buf(lcfg, 1);
967         if (sizeof(*desc) > LUSTRE_CFG_BUFLEN(lcfg, 1)) {
968                 CERROR("descriptor size wrong: %d > %d\n",
969                        (int)sizeof(*desc), LUSTRE_CFG_BUFLEN(lcfg, 1));
970                 RETURN(-EINVAL);
971         }
972
973         lmv->tgts_size = LMV_MAX_TGT_COUNT * sizeof(struct lmv_tgt_desc);
974
975         OBD_ALLOC(lmv->tgts, lmv->tgts_size);
976         if (lmv->tgts == NULL)
977                 RETURN(-ENOMEM);
978
979         for (i = 0; i < LMV_MAX_TGT_COUNT; i++) {
980                 sema_init(&lmv->tgts[i].ltd_fid_sem, 1);
981                 lmv->tgts[i].ltd_idx = i;
982         }
983
984         lmv->datas_size = LMV_MAX_TGT_COUNT * sizeof(struct obd_connect_data);
985
986         OBD_ALLOC(lmv->datas, lmv->datas_size);
987         if (lmv->datas == NULL)
988                 GOTO(out_free_tgts, rc = -ENOMEM);
989
990         obd_str2uuid(&lmv->desc.ld_uuid, desc->ld_uuid.uuid);
991         lmv->desc.ld_tgt_count = 0;
992         lmv->desc.ld_active_tgt_count = 0;
993         lmv->max_cookiesize = 0;
994         lmv->max_def_easize = 0;
995         lmv->max_easize = 0;
996
997         spin_lock_init(&lmv->lmv_lock);
998         sema_init(&lmv->init_sem, 1);
999
1000         rc = lmv_obj_setup(obd);
1001         if (rc) {
1002                 CERROR("Can't setup LMV object manager, "
1003                        "error %d.\n", rc);
1004                 GOTO(out_free_datas, rc);
1005         }
1006
1007         lprocfs_lmv_init_vars(&lvars);
1008         lprocfs_obd_setup(obd, lvars.obd_vars);
1009 #ifdef LPROCFS
1010         {
1011                 struct proc_dir_entry *entry;
1012
1013                 entry = create_proc_entry("target_obd_status", 0444,
1014                                           obd->obd_proc_entry);
1015                 if (entry != NULL) {
1016                         entry->proc_fops = &lmv_proc_target_fops;
1017                         entry->data = obd;
1018                 }
1019        }
1020 #endif
1021         rc = fld_client_init(&lmv->lmv_fld, obd->obd_name,
1022                              LUSTRE_CLI_FLD_HASH_DHT);
1023         if (rc) {
1024                 CERROR("can't init FLD, err %d\n",
1025                        rc);
1026                 GOTO(out_free_datas, rc);
1027         }
1028
1029         RETURN(0);
1030
1031 out_free_datas:
1032         OBD_FREE(lmv->datas, lmv->datas_size);
1033         lmv->datas = NULL;
1034 out_free_tgts:
1035         OBD_FREE(lmv->tgts, lmv->tgts_size);
1036         lmv->tgts = NULL;
1037         return rc;
1038 }
1039
1040 static int lmv_cleanup(struct obd_device *obd)
1041 {
1042         struct lmv_obd *lmv = &obd->u.lmv;
1043         ENTRY;
1044
1045         fld_client_fini(&lmv->lmv_fld);
1046         lprocfs_obd_cleanup(obd);
1047         lmv_obj_cleanup(obd);
1048         OBD_FREE(lmv->datas, lmv->datas_size);
1049         OBD_FREE(lmv->tgts, lmv->tgts_size);
1050
1051         RETURN(0);
1052 }
1053
1054 static int lmv_process_config(struct obd_device *obd, obd_count len, void *buf)
1055 {
1056         struct lustre_cfg *lcfg = buf;
1057         struct obd_uuid tgt_uuid;
1058         int rc;
1059         ENTRY;
1060
1061         switch(lcfg->lcfg_command) {
1062         case LCFG_ADD_MDC:
1063                 if (LUSTRE_CFG_BUFLEN(lcfg, 1) > sizeof(tgt_uuid.uuid))
1064                         GOTO(out, rc = -EINVAL);
1065
1066                 obd_str2uuid(&tgt_uuid, lustre_cfg_string(lcfg, 1));
1067                 rc = lmv_add_target(obd, &tgt_uuid);
1068                 GOTO(out, rc);
1069         default: {
1070                 CERROR("Unknown command: %d\n", lcfg->lcfg_command);
1071                 GOTO(out, rc = -EINVAL);
1072         }
1073         }
1074 out:
1075         RETURN(rc);
1076 }
1077
1078 static int lmv_statfs(struct obd_device *obd, struct obd_statfs *osfs,
1079                       __u64 max_age, __u32 flags)
1080 {
1081         struct lmv_obd *lmv = &obd->u.lmv;
1082         struct obd_statfs *temp;
1083         int rc = 0, i;
1084         ENTRY;
1085
1086         rc = lmv_check_connect(obd);
1087         if (rc)
1088                 RETURN(rc);
1089
1090         OBD_ALLOC(temp, sizeof(*temp));
1091         if (temp == NULL)
1092                 RETURN(-ENOMEM);
1093
1094         for (i = 0; i < lmv->desc.ld_tgt_count; i++) {
1095                 if (lmv->tgts[i].ltd_exp == NULL)
1096                         continue;
1097
1098                 rc = obd_statfs(lmv->tgts[i].ltd_exp->exp_obd, temp,
1099                                 max_age, flags);
1100                 if (rc) {
1101                         CERROR("can't stat MDS #%d (%s), error %d\n", i,
1102                                lmv->tgts[i].ltd_exp->exp_obd->obd_name,
1103                                rc);
1104                         GOTO(out_free_temp, rc);
1105                 }
1106                 if (i == 0) {
1107                         *osfs = *temp;
1108                 } else {
1109                         osfs->os_bavail += temp->os_bavail;
1110                         osfs->os_blocks += temp->os_blocks;
1111                         osfs->os_ffree += temp->os_ffree;
1112                         osfs->os_files += temp->os_files;
1113                 }
1114         }
1115
1116         EXIT;
1117 out_free_temp:
1118         OBD_FREE(temp, sizeof(*temp));
1119         return rc;
1120 }
1121
1122 static int lmv_getstatus(struct obd_export *exp,
1123                          struct lu_fid *fid,
1124                          struct obd_capa **pc)
1125 {
1126         struct obd_device *obd = exp->exp_obd;
1127         struct lmv_obd *lmv = &obd->u.lmv;
1128         int rc;
1129         ENTRY;
1130
1131         rc = lmv_check_connect(obd);
1132         if (rc)
1133                 RETURN(rc);
1134
1135         rc = md_getstatus(lmv->tgts[0].ltd_exp, fid, pc);
1136
1137         RETURN(rc);
1138 }
1139
1140 static int lmv_getxattr(struct obd_export *exp, const struct lu_fid *fid,
1141                         struct obd_capa *oc, obd_valid valid, const char *name,
1142                         const char *input, int input_size, int output_size,
1143                         int flags, struct ptlrpc_request **request)
1144 {
1145         struct obd_device *obd = exp->exp_obd;
1146         struct lmv_obd *lmv = &obd->u.lmv;
1147         struct obd_export *tgt_exp;
1148         int rc;
1149         ENTRY;
1150
1151         rc = lmv_check_connect(obd);
1152         if (rc)
1153                 RETURN(rc);
1154
1155         tgt_exp = lmv_find_export(lmv, fid);
1156         if (IS_ERR(tgt_exp))
1157                 RETURN(PTR_ERR(tgt_exp));
1158
1159         rc = md_getxattr(tgt_exp, fid, oc, valid, name, input, input_size,
1160                          output_size, flags, request);
1161
1162         RETURN(rc);
1163 }
1164
1165 static int lmv_setxattr(struct obd_export *exp, const struct lu_fid *fid,
1166                         struct obd_capa *oc, obd_valid valid, const char *name,
1167                         const char *input, int input_size, int output_size,
1168                         int flags, __u32 suppgid,
1169                         struct ptlrpc_request **request)
1170 {
1171         struct obd_device *obd = exp->exp_obd;
1172         struct lmv_obd *lmv = &obd->u.lmv;
1173         struct obd_export *tgt_exp;
1174         int rc;
1175         ENTRY;
1176
1177         rc = lmv_check_connect(obd);
1178         if (rc)
1179                 RETURN(rc);
1180
1181         tgt_exp = lmv_find_export(lmv, fid);
1182         if (IS_ERR(tgt_exp))
1183                 RETURN(PTR_ERR(tgt_exp));
1184
1185         rc = md_setxattr(tgt_exp, fid, oc, valid, name,
1186                          input, input_size, output_size, flags, suppgid,
1187                          request);
1188
1189         RETURN(rc);
1190 }
1191
1192 static int lmv_getattr(struct obd_export *exp, const struct lu_fid *fid,
1193                        struct obd_capa *oc, obd_valid valid, int ea_size,
1194                        struct ptlrpc_request **request)
1195 {
1196         struct obd_device *obd = exp->exp_obd;
1197         struct lmv_obd *lmv = &obd->u.lmv;
1198         struct obd_export *tgt_exp;
1199         struct lmv_obj *obj;
1200         int rc, i;
1201         ENTRY;
1202
1203         rc = lmv_check_connect(obd);
1204         if (rc)
1205                 RETURN(rc);
1206
1207         tgt_exp = lmv_find_export(lmv, fid);
1208         if (IS_ERR(tgt_exp))
1209                 RETURN(PTR_ERR(tgt_exp));
1210
1211         rc = md_getattr(tgt_exp, fid, oc, valid, ea_size, request);
1212         if (rc)
1213                 RETURN(rc);
1214
1215         obj = lmv_obj_grab(obd, fid);
1216
1217         CDEBUG(D_OTHER, "GETATTR for "DFID" %s\n", PFID(fid),
1218                obj ? "(split)" : "");
1219
1220         /*
1221          * If object is split, then we loop over all the slaves and gather size
1222          * attribute. In ideal world we would have to gather also mds field from
1223          * all slaves, as object is spread over the cluster and this is
1224          * definitely interesting information and it is not good to loss it,
1225          * but...
1226          */
1227         if (obj) {
1228                 struct mdt_body *body;
1229
1230                 if (*request == NULL) {
1231                         lmv_obj_put(obj);
1232                         RETURN(rc);
1233                 }
1234
1235                 body = req_capsule_server_get(&(*request)->rq_pill,
1236                                               &RMF_MDT_BODY);
1237                 LASSERT(body != NULL);
1238
1239                 lmv_obj_lock(obj);
1240
1241                 for (i = 0; i < obj->lo_objcount; i++) {
1242                         if (lmv->tgts[i].ltd_exp == NULL) {
1243                                 CWARN("%s: NULL export for %d\n",
1244                                       obd->obd_name, i);
1245                                 continue;
1246                         }
1247
1248                         /* skip master obj. */
1249                         if (lu_fid_eq(&obj->lo_fid, &obj->lo_inodes[i].li_fid))
1250                                 continue;
1251
1252                         lmv_update_body(body, &obj->lo_inodes[i]);
1253                 }
1254
1255                 lmv_obj_unlock(obj);
1256                 lmv_obj_put(obj);
1257         }
1258
1259         RETURN(rc);
1260 }
1261
1262 static int lmv_change_cbdata(struct obd_export *exp, const struct lu_fid *fid,
1263                              ldlm_iterator_t it, void *data)
1264 {
1265         struct obd_device *obd = exp->exp_obd;
1266         struct lmv_obd *lmv = &obd->u.lmv;
1267         int i, rc;
1268         ENTRY;
1269
1270         rc = lmv_check_connect(obd);
1271         if (rc)
1272                 RETURN(rc);
1273
1274         CDEBUG(D_OTHER, "CBDATA for "DFID"\n", PFID(fid));
1275
1276         /*
1277          * With CMD every object can have two locks in different namespaces:
1278          * lookup lock in space of mds storing direntry and update/open lock in
1279          * space of mds storing inode.
1280          */
1281         for (i = 0; i < lmv->desc.ld_tgt_count; i++)
1282                 md_change_cbdata(lmv->tgts[i].ltd_exp, fid, it, data);
1283
1284         RETURN(0);
1285 }
1286
1287 static int lmv_close(struct obd_export *exp,
1288                      struct md_op_data *op_data,
1289                      struct md_open_data *mod,
1290                      struct ptlrpc_request **request)
1291 {
1292         struct obd_device *obd = exp->exp_obd;
1293         struct lmv_obd *lmv = &obd->u.lmv;
1294         struct obd_export *tgt_exp;
1295         int rc;
1296         ENTRY;
1297
1298         rc = lmv_check_connect(obd);
1299         if (rc)
1300                 RETURN(rc);
1301
1302         tgt_exp = lmv_find_export(lmv, &op_data->op_fid1);
1303         if (IS_ERR(tgt_exp))
1304                 RETURN(PTR_ERR(tgt_exp));
1305
1306         CDEBUG(D_OTHER, "CLOSE "DFID"\n", PFID(&op_data->op_fid1));
1307         rc = md_close(tgt_exp, op_data, mod, request);
1308         RETURN(rc);
1309 }
1310
1311 /*
1312  * Called in the case MDS returns -ERESTART on create on open, what means that
1313  * directory is split and its LMV presentation object has to be updated.
1314  */
1315 int lmv_handle_split(struct obd_export *exp, const struct lu_fid *fid)
1316 {
1317         struct obd_device *obd = exp->exp_obd;
1318         struct lmv_obd *lmv = &obd->u.lmv;
1319         struct ptlrpc_request *req = NULL;
1320         struct obd_export *tgt_exp;
1321         struct lmv_obj *obj;
1322         struct lustre_md md;
1323         int mealen, rc;
1324         __u64 valid;
1325         ENTRY;
1326
1327         md.mea = NULL;
1328         mealen = lmv_get_easize(lmv);
1329
1330         valid = OBD_MD_FLEASIZE | OBD_MD_FLDIREA | OBD_MD_MEA;
1331
1332         tgt_exp = lmv_find_export(lmv, fid);
1333         if (IS_ERR(tgt_exp))
1334                 RETURN(PTR_ERR(tgt_exp));
1335
1336         /* time to update mea of parent fid */
1337         rc = md_getattr(tgt_exp, fid, NULL, valid, mealen, &req);
1338         if (rc) {
1339                 CERROR("md_getattr() failed, error %d\n", rc);
1340                 GOTO(cleanup, rc);
1341         }
1342
1343         rc = md_get_lustre_md(tgt_exp, req, NULL, exp, &md);
1344         if (rc) {
1345                 CERROR("mdc_get_lustre_md() failed, error %d\n", rc);
1346                 GOTO(cleanup, rc);
1347         }
1348
1349         if (md.mea == NULL)
1350                 GOTO(cleanup, rc = -ENODATA);
1351
1352         obj = lmv_obj_create(exp, fid, md.mea);
1353         if (IS_ERR(obj))
1354                 rc = PTR_ERR(obj);
1355         else
1356                 lmv_obj_put(obj);
1357
1358         obd_free_memmd(exp, (struct lov_stripe_md **)&md.mea);
1359
1360         EXIT;
1361 cleanup:
1362         if (req)
1363                 ptlrpc_req_finished(req);
1364         return rc;
1365 }
1366
1367 int lmv_create(struct obd_export *exp, struct md_op_data *op_data,
1368                const void *data, int datalen, int mode, __u32 uid,
1369                __u32 gid, __u32 cap_effective,  __u64 rdev,
1370                struct ptlrpc_request **request)
1371 {
1372         struct obd_device *obd = exp->exp_obd;
1373         struct lmv_obd *lmv = &obd->u.lmv;
1374         struct obd_export *tgt_exp;
1375         struct lmv_obj *obj;
1376         int rc, loop = 0;
1377         ENTRY;
1378
1379         rc = lmv_check_connect(obd);
1380         if (rc)
1381                 RETURN(rc);
1382
1383         if (!lmv->desc.ld_active_tgt_count)
1384                 RETURN(-EIO);
1385 repeat:
1386         ++loop;
1387         LASSERT(loop <= 2);
1388         obj = lmv_obj_grab(obd, &op_data->op_fid1);
1389         if (obj) {
1390                 int mea_idx;
1391
1392                 mea_idx = raw_name2idx(obj->lo_hashtype, obj->lo_objcount,
1393                                        op_data->op_name, op_data->op_namelen);
1394                 op_data->op_fid1 = obj->lo_inodes[mea_idx].li_fid;
1395                 op_data->op_bias &= ~MDS_CHECK_SPLIT;
1396                 op_data->op_mds = obj->lo_inodes[mea_idx].li_mds;
1397                 tgt_exp = lmv_get_export(lmv, op_data->op_mds);
1398                 lmv_obj_put(obj);
1399         } else {
1400                 struct lmv_tgt_desc *tgt;
1401
1402                 tgt = lmv_find_target(lmv, &op_data->op_fid1);
1403                 op_data->op_bias |= MDS_CHECK_SPLIT;
1404                 op_data->op_mds = tgt->ltd_idx;
1405                 tgt_exp = tgt->ltd_exp;
1406         }
1407
1408         if (IS_ERR(tgt_exp))
1409                 RETURN(PTR_ERR(tgt_exp));
1410
1411         rc = lmv_fid_alloc(exp, &op_data->op_fid2, op_data);
1412         if (rc == -ERESTART)
1413                 goto repeat;
1414         else if (rc)
1415                 RETURN(rc);
1416
1417         CDEBUG(D_OTHER, "CREATE '%*s' on "DFID"\n", op_data->op_namelen,
1418                op_data->op_name, PFID(&op_data->op_fid1));
1419
1420         op_data->op_flags |= MF_MDC_CANCEL_FID1;
1421         rc = md_create(tgt_exp, op_data, data, datalen, mode, uid, gid,
1422                        cap_effective, rdev, request);
1423         if (rc == 0) {
1424                 if (*request == NULL)
1425                         RETURN(rc);
1426                 CDEBUG(D_OTHER, "created - "DFID"\n", PFID(&op_data->op_fid1));
1427         } else if (rc == -ERESTART) {
1428                 LASSERT(*request != NULL);
1429                 DEBUG_REQ(D_WARNING|D_RPCTRACE, *request,
1430                           "Got -ERESTART during create!\n");
1431                 ptlrpc_req_finished(*request);
1432                 *request = NULL;
1433
1434                 /*
1435                  * Directory got split. Time to update local object and repeat
1436                  * the request with proper MDS.
1437                  */
1438                 rc = lmv_handle_split(exp, &op_data->op_fid1);
1439                 if (rc == 0) {
1440                         rc = lmv_alloc_slave_fids(obd, &op_data->op_fid1,
1441                                                   op_data, &op_data->op_fid2);
1442                         if (rc)
1443                                 RETURN(rc);
1444                         goto repeat;
1445                 }
1446         }
1447         RETURN(rc);
1448 }
1449
1450 static int lmv_done_writing(struct obd_export *exp,
1451                             struct md_op_data *op_data,
1452                             struct md_open_data *mod)
1453 {
1454         struct obd_device *obd = exp->exp_obd;
1455         struct lmv_obd *lmv = &obd->u.lmv;
1456         struct obd_export *tgt_exp;
1457         int rc;
1458         ENTRY;
1459
1460         rc = lmv_check_connect(obd);
1461         if (rc)
1462                 RETURN(rc);
1463
1464         tgt_exp = lmv_find_export(lmv, &op_data->op_fid1);
1465         if (IS_ERR(tgt_exp))
1466                 RETURN(PTR_ERR(tgt_exp));
1467
1468         rc = md_done_writing(tgt_exp, op_data, mod);
1469         RETURN(rc);
1470 }
1471
1472 static int
1473 lmv_enqueue_slaves(struct obd_export *exp, struct ldlm_enqueue_info *einfo,
1474                    struct lookup_intent *it, struct md_op_data *op_data,
1475                    struct lustre_handle *lockh, void *lmm, int lmmsize)
1476 {
1477         struct obd_device *obd = exp->exp_obd;
1478         struct lmv_obd *lmv = &obd->u.lmv;
1479         struct lmv_stripe_md *mea = op_data->op_mea1;
1480         struct md_op_data *op_data2;
1481         struct obd_export *tgt_exp;
1482         int i, rc = 0;
1483         ENTRY;
1484
1485         OBD_ALLOC_PTR(op_data2);
1486         if (op_data2 == NULL)
1487                 RETURN(-ENOMEM);
1488
1489         LASSERT(mea != NULL);
1490         for (i = 0; i < mea->mea_count; i++) {
1491                 memset(op_data2, 0, sizeof(*op_data2));
1492                 op_data2->op_fid1 = mea->mea_ids[i];
1493                 op_data2->op_bias = 0;
1494
1495                 tgt_exp = lmv_find_export(lmv, &op_data2->op_fid1);
1496                 if (IS_ERR(tgt_exp))
1497                         GOTO(cleanup, rc = PTR_ERR(tgt_exp));
1498
1499                 if (tgt_exp == NULL)
1500                         continue;
1501
1502                 rc = md_enqueue(tgt_exp, einfo, it, op_data2,
1503                                 lockh + i, lmm, lmmsize, 0);
1504
1505                 CDEBUG(D_OTHER, "take lock on slave "DFID" -> %d/%d\n",
1506                        PFID(&mea->mea_ids[i]), rc, it->d.lustre.it_status);
1507
1508                 if (rc)
1509                         GOTO(cleanup, rc);
1510
1511                 if (it->d.lustre.it_data) {
1512                         struct ptlrpc_request *req;
1513                         req = (struct ptlrpc_request *)it->d.lustre.it_data;
1514                         ptlrpc_req_finished(req);
1515                 }
1516
1517                 if (it->d.lustre.it_status)
1518                         GOTO(cleanup, rc = it->d.lustre.it_status);
1519         }
1520
1521         EXIT;
1522 cleanup:
1523         OBD_FREE_PTR(op_data2);
1524
1525         if (rc != 0) {
1526                 /* drop all taken locks */
1527                 while (--i >= 0) {
1528                         if (lockh[i].cookie)
1529                                 ldlm_lock_decref(lockh + i, einfo->ei_mode);
1530                         lockh[i].cookie = 0;
1531                 }
1532         }
1533         return rc;
1534 }
1535
1536 static int
1537 lmv_enqueue_remote(struct obd_export *exp, struct ldlm_enqueue_info *einfo,
1538                    struct lookup_intent *it, struct md_op_data *op_data,
1539                    struct lustre_handle *lockh, void *lmm, int lmmsize,
1540                    int extra_lock_flags)
1541 {
1542         struct ptlrpc_request *req = it->d.lustre.it_data;
1543         struct obd_device *obd = exp->exp_obd;
1544         struct lmv_obd *lmv = &obd->u.lmv;
1545         struct lustre_handle plock;
1546         struct obd_export *tgt_exp;
1547         struct md_op_data *rdata;
1548         struct lu_fid fid_copy;
1549         struct mdt_body *body;
1550         int rc = 0, pmode;
1551         ENTRY;
1552
1553         body = req_capsule_server_get(&req->rq_pill, &RMF_MDT_BODY);
1554         LASSERT(body != NULL);
1555
1556         if (!(body->valid & OBD_MD_MDS))
1557                 RETURN(0);
1558
1559         CDEBUG(D_OTHER, "ENQUEUE '%s' on "DFID" -> "DFID"\n",
1560                LL_IT2STR(it), PFID(&op_data->op_fid1), PFID(&body->fid1));
1561
1562         /* We got LOOKUP lock, but we really need attrs */
1563         pmode = it->d.lustre.it_lock_mode;
1564         LASSERT(pmode != 0);
1565         memcpy(&plock, lockh, sizeof(plock));
1566         it->d.lustre.it_lock_mode = 0;
1567         it->d.lustre.it_data = NULL;
1568         fid_copy = body->fid1;
1569
1570         it->d.lustre.it_disposition &= ~DISP_ENQ_COMPLETE;
1571         ptlrpc_req_finished(req);
1572
1573         tgt_exp = lmv_find_export(lmv, &fid_copy);
1574         if (IS_ERR(tgt_exp))
1575                 GOTO(out, rc = PTR_ERR(tgt_exp));
1576
1577         OBD_ALLOC_PTR(rdata);
1578         if (rdata == NULL)
1579                 GOTO(out, rc = -ENOMEM);
1580
1581         rdata->op_fid1 = fid_copy;
1582         rdata->op_bias = MDS_CROSS_REF;
1583
1584         rc = md_enqueue(tgt_exp, einfo, it, rdata, lockh,
1585                         lmm, lmmsize, extra_lock_flags);
1586         OBD_FREE_PTR(rdata);
1587         EXIT;
1588 out:
1589         ldlm_lock_decref(&plock, pmode);
1590         return rc;
1591 }
1592
1593 static int
1594 lmv_enqueue(struct obd_export *exp, struct ldlm_enqueue_info *einfo,
1595             struct lookup_intent *it, struct md_op_data *op_data,
1596             struct lustre_handle *lockh, void *lmm, int lmmsize,
1597             int extra_lock_flags)
1598 {
1599         struct obd_device *obd = exp->exp_obd;
1600         struct lmv_obd *lmv = &obd->u.lmv;
1601         struct obd_export *tgt_exp = NULL;
1602         struct lmv_obj *obj;
1603         int rc;
1604         ENTRY;
1605
1606         rc = lmv_check_connect(obd);
1607         if (rc)
1608                 RETURN(rc);
1609
1610         if (op_data->op_mea1 && it->it_op == IT_UNLINK) {
1611                 rc = lmv_enqueue_slaves(exp, einfo, it, op_data,
1612                                         lockh, lmm, lmmsize);
1613                 RETURN(rc);
1614         }
1615
1616         if (op_data->op_namelen) {
1617                 obj = lmv_obj_grab(obd, &op_data->op_fid1);
1618                 if (obj) {
1619                         int mea_idx;
1620
1621                         /* directory is split. look for right mds for this
1622                          * name */
1623                         mea_idx = raw_name2idx(obj->lo_hashtype,
1624                                                obj->lo_objcount,
1625                                                (char *)op_data->op_name,
1626                                                op_data->op_namelen);
1627                         op_data->op_fid1 = obj->lo_inodes[mea_idx].li_fid;
1628                         tgt_exp = lmv_get_export(lmv, obj->lo_inodes[mea_idx].li_mds);
1629                         lmv_obj_put(obj);
1630                 }
1631         }
1632
1633         if (tgt_exp == NULL)
1634                 tgt_exp = lmv_find_export(lmv, &op_data->op_fid1);
1635         if (IS_ERR(tgt_exp))
1636                 RETURN(PTR_ERR(tgt_exp));
1637
1638         CDEBUG(D_OTHER, "ENQUEUE '%s' on "DFID"\n", LL_IT2STR(it),
1639                PFID(&op_data->op_fid1));
1640
1641         rc = md_enqueue(tgt_exp, einfo, it, op_data, lockh,
1642                         lmm, lmmsize, extra_lock_flags);
1643
1644         if (rc == 0 && it->it_op == IT_OPEN)
1645                 rc = lmv_enqueue_remote(exp, einfo, it, op_data, lockh,
1646                                         lmm, lmmsize, extra_lock_flags);
1647         RETURN(rc);
1648 }
1649
1650 static int
1651 lmv_getattr_name(struct obd_export *exp, const struct lu_fid *fid,
1652                  struct obd_capa *oc, const char *filename, int namelen,
1653                  obd_valid valid, int ea_size, __u32 suppgid,
1654                  struct ptlrpc_request **request)
1655 {
1656         struct obd_device *obd = exp->exp_obd;
1657         struct lmv_obd *lmv = &obd->u.lmv;
1658         struct lu_fid rid = *fid;
1659         struct obd_export *tgt_exp;
1660         struct mdt_body *body;
1661         struct lmv_obj *obj;
1662         int rc, loop = 0;
1663         ENTRY;
1664
1665         rc = lmv_check_connect(obd);
1666         if (rc)
1667                 RETURN(rc);
1668
1669 repeat:
1670         ++loop;
1671         LASSERT(loop <= 2);
1672         obj = lmv_obj_grab(obd, &rid);
1673         if (obj) {
1674                 int mea_idx;
1675
1676                 /* Directory is split. Look for right mds for this name */
1677                 mea_idx = raw_name2idx(obj->lo_hashtype, obj->lo_objcount,
1678                                        filename, namelen - 1);
1679                 rid = obj->lo_inodes[mea_idx].li_fid;
1680                 tgt_exp = lmv_get_export(lmv, obj->lo_inodes[mea_idx].li_mds);
1681                 lmv_obj_put(obj);
1682                 valid &= ~OBD_MD_FLCKSPLIT;
1683         } else {
1684                 tgt_exp = lmv_find_export(lmv, &rid);
1685                 valid |= OBD_MD_FLCKSPLIT;
1686         }
1687         if (IS_ERR(tgt_exp))
1688                 RETURN(PTR_ERR(tgt_exp));
1689
1690         CDEBUG(D_OTHER, "getattr_name for %*s on "DFID" -> "DFID"\n",
1691                namelen, filename, PFID(fid), PFID(&rid));
1692
1693         rc = md_getattr_name(tgt_exp, &rid, oc, filename, namelen, valid,
1694                              ea_size, suppgid, request);
1695         if (rc == 0) {
1696                 body = req_capsule_server_get(&(*request)->rq_pill,
1697                                               &RMF_MDT_BODY);
1698                 LASSERT(body != NULL);
1699
1700                 if (body->valid & OBD_MD_MDS) {
1701                         struct ptlrpc_request *req = NULL;
1702
1703                         rid = body->fid1;
1704                         CDEBUG(D_OTHER, "request attrs for "DFID"\n",
1705                                PFID(&rid));
1706
1707                         tgt_exp = lmv_find_export(lmv, &rid);
1708                         if (IS_ERR(tgt_exp)) {
1709                                 ptlrpc_req_finished(*request);
1710                                 RETURN(PTR_ERR(tgt_exp));
1711                         }
1712
1713                         rc = md_getattr_name(tgt_exp, &rid, NULL, NULL, 1,
1714                                              valid | OBD_MD_FLCROSSREF,
1715                                              ea_size, suppgid, &req);
1716                         ptlrpc_req_finished(*request);
1717                         *request = req;
1718                 }
1719         } else if (rc == -ERESTART) {
1720                 LASSERT(*request != NULL);
1721                 DEBUG_REQ(D_WARNING|D_RPCTRACE, *request,
1722                           "Got -ERESTART during getattr!\n");
1723                 ptlrpc_req_finished(*request);
1724                 *request = NULL;
1725
1726                 /*
1727                  * Directory got split. Time to update local object and repeat
1728                  * the request with proper MDS.
1729                  */
1730                 rc = lmv_handle_split(exp, &rid);
1731                 if (rc == 0)
1732                         goto repeat;
1733         }
1734         RETURN(rc);
1735 }
1736
1737 #define md_op_data_fid(op_data, fl)                     \
1738         (fl == MF_MDC_CANCEL_FID1 ? &op_data->op_fid1 : \
1739          fl == MF_MDC_CANCEL_FID2 ? &op_data->op_fid2 : \
1740          fl == MF_MDC_CANCEL_FID3 ? &op_data->op_fid3 : \
1741          fl == MF_MDC_CANCEL_FID4 ? &op_data->op_fid4 : \
1742          NULL)
1743
1744 /* @tgt_exp is the export the metadata request is sent.
1745  * @fid_exp is the export the cancel should be sent for the current fid.
1746  * if @fid_exp is NULL, the export is found for the current fid.
1747  * @op_data keeps the current fid, which is pointed through @flag.
1748  * @mode, @bits -- lock match parameters. */
1749 static int lmv_early_cancel(struct lmv_obd *lmv, struct obd_export *tgt_exp,
1750                             struct obd_export *fid_exp,
1751                             struct md_op_data *op_data,
1752                             ldlm_mode_t mode, int bits, int flag)
1753 {
1754         struct lu_fid *fid = md_op_data_fid(op_data, flag);
1755         ldlm_policy_data_t policy = {{0}};
1756         int rc = 0;
1757         ENTRY;
1758
1759         if (!fid_is_sane(fid))
1760                 RETURN(0);
1761         
1762         if (fid_exp == NULL)
1763                 fid_exp = lmv_find_export(lmv, fid);
1764
1765         if (tgt_exp == fid_exp) {
1766                 /* The export is the same as on the target server, cancel 
1767                  * will be sent along with the main metadata operation. */
1768                 op_data->op_flags |= flag;
1769                 RETURN(0);
1770         }
1771
1772         policy.l_inodebits.bits = bits;
1773         rc = md_cancel_unused(fid_exp, fid, &policy, mode, LDLM_FL_ASYNC, NULL);
1774         RETURN(rc);
1775 }
1776
1777 #ifdef EARLY_CANCEL_FOR_STRIPED_DIR_IS_READY
1778 /* Check if the fid in @op_data pointed to by flag is of the same export(s)
1779  * as @tgt_exp. Early cancels will be sent later by mdc code, otherwise, call
1780  * md_cancel_unused for child export(s). */
1781 static int lmv_early_cancel_stripes(struct obd_export *exp,
1782                                     struct obd_export *tgt_exp,
1783                                     struct md_op_data *op_data,
1784                                     ldlm_mode_t mode, int bits, int flag)
1785 {
1786         struct lu_fid *fid = md_op_data_fid(op_data, flag);
1787         struct obd_device *obd = exp->exp_obd;
1788         struct lmv_obd *lmv = &obd->u.lmv;
1789         struct obd_export *st_exp;
1790         struct lmv_obj *obj;
1791         int rc = 0;
1792         ENTRY;
1793
1794         if (!fid_is_sane(fid))
1795                 RETURN(0);
1796
1797         obj = lmv_obj_grab(obd, fid);
1798         if (obj) {
1799                 ldlm_policy_data_t policy = {{0}};
1800                 struct lu_fid *st_fid;
1801                 int i;
1802                 
1803                 policy.l_inodebits.bits = bits;
1804                 for (i = 0; i < obj->lo_objcount; i++) {
1805                         st_exp = lmv_get_export(lmv, obj->lo_inodes[i].li_mds);
1806                         st_fid = &obj->lo_inodes[i].li_fid;
1807                         if (tgt_exp != st_exp) {
1808                                 rc = md_cancel_unused(st_exp, st_fid, &policy,
1809                                                       mode, LDLM_FL_ASYNC,
1810                                                       NULL);
1811                                 if (rc)
1812                                         break;
1813                         } else {
1814                                 /* Some export matches to @tgt_exp, do cancel
1815                                  * for its fid in mdc */
1816                                 *fid = *st_fid;
1817                                 op_data->op_flags |= flag;
1818                         }
1819                 }
1820                 lmv_obj_put(obj);
1821         } else {
1822                 rc = lmv_early_cancel(lmv, tgt_exp, NULL, op_data,
1823                                       mode, bits, flag);
1824         }
1825         RETURN(rc);
1826 }
1827 #endif
1828
1829 /*
1830  * llite passes fid of an target inode in op_data->op_fid1 and id of directory in
1831  * op_data->op_fid2
1832  */
1833 static int lmv_link(struct obd_export *exp, struct md_op_data *op_data,
1834                     struct ptlrpc_request **request)
1835 {
1836         struct obd_device *obd = exp->exp_obd;
1837         struct lmv_obd *lmv = &obd->u.lmv;
1838         struct obd_export *tgt_exp;
1839         struct lmv_obj *obj;
1840         int rc, loop = 0;
1841         mdsno_t mds;
1842         ENTRY;
1843
1844         rc = lmv_check_connect(obd);
1845         if (rc)
1846                 RETURN(rc);
1847
1848 repeat:
1849         ++loop;
1850         LASSERT(loop <= 2);
1851         if (op_data->op_namelen != 0) {
1852                 int mea_idx;
1853
1854                 /* Usual link request */
1855                 obj = lmv_obj_grab(obd, &op_data->op_fid2);
1856                 if (obj) {
1857                         mea_idx = raw_name2idx(obj->lo_hashtype,
1858                                                obj->lo_objcount,
1859                                                op_data->op_name,
1860                                                op_data->op_namelen);
1861                         op_data->op_fid2 = obj->lo_inodes[mea_idx].li_fid;
1862                         mds = obj->lo_inodes[mea_idx].li_mds;
1863                         lmv_obj_put(obj);
1864                 } else {
1865                         rc = lmv_fld_lookup(lmv, &op_data->op_fid2, &mds);
1866                         if (rc)
1867                                 RETURN(rc);
1868                 }
1869
1870                 CDEBUG(D_OTHER,"link "DFID":%*s to "DFID"\n",
1871                        PFID(&op_data->op_fid2), op_data->op_namelen,
1872                        op_data->op_name, PFID(&op_data->op_fid1));
1873         } else {
1874                 rc = lmv_fld_lookup(lmv, &op_data->op_fid1, &mds);
1875                 if (rc)
1876                         RETURN(rc);
1877
1878                 /* request from MDS to acquire i_links for inode by fid1 */
1879                 CDEBUG(D_OTHER, "inc i_nlinks for "DFID"\n",
1880                        PFID(&op_data->op_fid1));
1881         }
1882
1883         CDEBUG(D_OTHER, "forward to MDS #"LPU64" ("DFID")\n",
1884                mds, PFID(&op_data->op_fid1));
1885
1886         op_data->op_fsuid = current->fsuid;
1887         op_data->op_fsgid = current->fsgid;
1888         op_data->op_cap   = current->cap_effective;
1889
1890         tgt_exp = lmv->tgts[mds].ltd_exp;
1891         if (op_data->op_namelen) {
1892                 op_data->op_flags |= MF_MDC_CANCEL_FID2;
1893                 /* Cancel UPDATE lock on child (fid1). */
1894                 rc = lmv_early_cancel(lmv, tgt_exp, NULL, op_data, LCK_EX,
1895                                       MDS_INODELOCK_UPDATE, MF_MDC_CANCEL_FID1);
1896         }
1897         if (rc == 0)
1898                 rc = md_link(tgt_exp, op_data, request);
1899         if (rc == -ERESTART) {
1900                 LASSERT(*request != NULL);
1901                 DEBUG_REQ(D_WARNING|D_RPCTRACE, *request,
1902                           "Got -ERESTART during link!\n");
1903                 ptlrpc_req_finished(*request);
1904                 *request = NULL;
1905
1906                 /*
1907                  * Directory got split. Time to update local object and repeat
1908                  * the request with proper MDS.
1909                  */
1910                 rc = lmv_handle_split(exp, &op_data->op_fid2);
1911                 if (rc == 0)
1912                         goto repeat;
1913         }
1914
1915         RETURN(rc);
1916 }
1917
1918 static int lmv_rename(struct obd_export *exp, struct md_op_data *op_data,
1919                       const char *old, int oldlen, const char *new, int newlen,
1920                       struct ptlrpc_request **request)
1921 {
1922         struct obd_export *tgt_exp = NULL, *src_exp;
1923         struct obd_device *obd = exp->exp_obd;
1924         struct lmv_obd *lmv = &obd->u.lmv;
1925         int rc, mea_idx, loop = 0;
1926         struct lmv_obj *obj;
1927         mdsno_t mds1, mds2;
1928         ENTRY;
1929
1930         CDEBUG(D_OTHER, "rename %*s in "DFID" to %*s in "DFID"\n",
1931                oldlen, old, PFID(&op_data->op_fid1),
1932                newlen, new, PFID(&op_data->op_fid2));
1933
1934         rc = lmv_check_connect(obd);
1935         if (rc)
1936                 RETURN(rc);
1937
1938         if (oldlen == 0) {
1939                 /*
1940                  * MDS with old dir entry is asking another MDS to create name
1941                  * there.
1942                  */
1943                 CDEBUG(D_OTHER,
1944                        "create %*s(%d/%d) in "DFID" pointing "
1945                        "to "DFID"\n", newlen, new, oldlen, newlen,
1946                        PFID(&op_data->op_fid2), PFID(&op_data->op_fid1));
1947
1948                 rc = lmv_fld_lookup(lmv, &op_data->op_fid2, &mds1);
1949                 if (rc)
1950                         RETURN(rc);
1951
1952                 /*
1953                  * Target directory can be split, sowe should forward request to
1954                  * the right MDS.
1955                  */
1956                 obj = lmv_obj_grab(obd, &op_data->op_fid2);
1957                 if (obj) {
1958                         mea_idx = raw_name2idx(obj->lo_hashtype,
1959                                                obj->lo_objcount,
1960                                                (char *)new, newlen);
1961                         op_data->op_fid2 = obj->lo_inodes[mea_idx].li_fid;
1962                         CDEBUG(D_OTHER, "Parent obj "DFID"\n",
1963                                PFID(&op_data->op_fid2));
1964                         lmv_obj_put(obj);
1965                 }
1966                 goto request;
1967         }
1968
1969 repeat:
1970         ++loop;
1971         LASSERT(loop <= 2);
1972         obj = lmv_obj_grab(obd, &op_data->op_fid1);
1973         if (obj) {
1974                 /*
1975                  * directory is already split, so we have to forward request to
1976                  * the right MDS.
1977                  */
1978                 mea_idx = raw_name2idx(obj->lo_hashtype, obj->lo_objcount,
1979                                        (char *)old, oldlen);
1980                 op_data->op_fid1 = obj->lo_inodes[mea_idx].li_fid;
1981                 mds1 = obj->lo_inodes[mea_idx].li_mds;
1982                 CDEBUG(D_OTHER, "Parent obj "DFID"\n", PFID(&op_data->op_fid1));
1983                 lmv_obj_put(obj);
1984         } else {
1985                 rc = lmv_fld_lookup(lmv, &op_data->op_fid1, &mds1);
1986                 if (rc)
1987                         RETURN(rc);
1988         }
1989
1990         obj = lmv_obj_grab(obd, &op_data->op_fid2);
1991         if (obj) {
1992                 /*
1993                  * Directory is already split, so we have to forward request to
1994                  * the right MDS.
1995                  */
1996                 mea_idx = raw_name2idx(obj->lo_hashtype, obj->lo_objcount,
1997                                        (char *)new, newlen);
1998
1999                 mds2 = obj->lo_inodes[mea_idx].li_mds;
2000                 op_data->op_fid2 = obj->lo_inodes[mea_idx].li_fid;
2001                 CDEBUG(D_OTHER, "Parent obj "DFID"\n", PFID(&op_data->op_fid2));
2002                 lmv_obj_put(obj);
2003         } else {
2004                 rc = lmv_fld_lookup(lmv, &op_data->op_fid2, &mds2);
2005                 if (rc)
2006                         RETURN(rc);
2007         }
2008
2009 request:
2010         op_data->op_fsuid = current->fsuid;
2011         op_data->op_fsgid = current->fsgid;
2012         op_data->op_cap   = current->cap_effective;
2013
2014         src_exp = lmv_get_export(lmv, mds1);
2015         tgt_exp = lmv_get_export(lmv, mds2);
2016         if (oldlen) {
2017                 /* LOOKUP lock on src child (fid3) should also be cancelled for
2018                  * src_exp in mdc_rename. */
2019                 op_data->op_flags |= MF_MDC_CANCEL_FID1 | MF_MDC_CANCEL_FID3;
2020
2021                 /* Cancel UPDATE locks on tgt parent (fid2), tgt_exp is its
2022                  * own export. */
2023                 rc = lmv_early_cancel(lmv, src_exp, tgt_exp, op_data, LCK_EX,
2024                                       MDS_INODELOCK_UPDATE, MF_MDC_CANCEL_FID2);
2025
2026                 /* Cancel LOOKUP locks on tgt child (fid4) for parent tgt_exp.*/
2027                 if (rc == 0)
2028                         rc = lmv_early_cancel(lmv, src_exp, tgt_exp, op_data,
2029                                               LCK_EX, MDS_INODELOCK_LOOKUP,
2030                                               MF_MDC_CANCEL_FID4);
2031
2032                 /* XXX: the case when child is a striped dir is not supported.
2033                  * Only the master stripe has all locks cancelled early. */
2034                 /* Cancel all the locks on tgt child (fid4). */
2035                 if (rc == 0)
2036                         rc = lmv_early_cancel(lmv, src_exp, NULL, op_data,
2037                                               LCK_EX, MDS_INODELOCK_FULL,
2038                                               MF_MDC_CANCEL_FID4);
2039         }
2040
2041         if (rc == 0)
2042                 rc = md_rename(src_exp, op_data, old, oldlen,
2043                                new, newlen, request);
2044         if (rc == -ERESTART) {
2045                 LASSERT(*request != NULL);
2046                 DEBUG_REQ(D_WARNING|D_RPCTRACE, *request,
2047                           "Got -ERESTART during rename!\n");
2048                 ptlrpc_req_finished(*request);
2049                 *request = NULL;
2050
2051                 /*
2052                  * Directory got split. Time to update local object and repeat
2053                  * the request with proper MDS.
2054                  */
2055                 rc = lmv_handle_split(exp, &op_data->op_fid1);
2056                 if (rc == 0)
2057                         goto repeat;
2058         }
2059         RETURN(rc);
2060 }
2061
2062 static int lmv_setattr(struct obd_export *exp, struct md_op_data *op_data,
2063                        void *ea, int ealen, void *ea2, int ea2len,
2064                        struct ptlrpc_request **request,
2065                        struct md_open_data **mod)
2066 {
2067         struct obd_device *obd = exp->exp_obd;
2068         struct lmv_obd *lmv = &obd->u.lmv;
2069         struct ptlrpc_request *req;
2070         struct obd_export *tgt_exp;
2071         struct lmv_obj *obj;
2072         int rc = 0, i;
2073         ENTRY;
2074
2075         rc = lmv_check_connect(obd);
2076         if (rc)
2077                 RETURN(rc);
2078
2079         obj = lmv_obj_grab(obd, &op_data->op_fid1);
2080
2081         CDEBUG(D_OTHER, "SETATTR for "DFID", valid 0x%x%s\n",
2082                PFID(&op_data->op_fid1), op_data->op_attr.ia_valid,
2083                obj ? ", split" : "");
2084
2085         op_data->op_flags |= MF_MDC_CANCEL_FID1;
2086         if (obj) {
2087                 for (i = 0; i < obj->lo_objcount; i++) {
2088                         op_data->op_fid1 = obj->lo_inodes[i].li_fid;
2089
2090                         tgt_exp = lmv_get_export(lmv, obj->lo_inodes[i].li_mds);
2091                         if (IS_ERR(tgt_exp)) {
2092                                 rc = PTR_ERR(tgt_exp);
2093                                 break;
2094                         }
2095
2096                         rc = md_setattr(tgt_exp, op_data, ea, ealen,
2097                                         ea2, ea2len, &req, mod);
2098
2099                         if (lu_fid_eq(&obj->lo_fid, &obj->lo_inodes[i].li_fid)) {
2100                                 /*
2101                                  * this is master object and this request should
2102                                  * be returned back to llite.
2103                                  */
2104                                 *request = req;
2105                         } else {
2106                                 ptlrpc_req_finished(req);
2107                         }
2108
2109                         if (rc)
2110                                 break;
2111                 }
2112                 lmv_obj_put(obj);
2113         } else {
2114                 tgt_exp = lmv_find_export(lmv, &op_data->op_fid1);
2115                 if (IS_ERR(tgt_exp))
2116                         RETURN(PTR_ERR(tgt_exp));
2117
2118                 rc = md_setattr(tgt_exp, op_data, ea, ealen, ea2,
2119                                 ea2len, request, mod);
2120         }
2121         RETURN(rc);
2122 }
2123
2124 static int lmv_sync(struct obd_export *exp, const struct lu_fid *fid,
2125                     struct obd_capa *oc, struct ptlrpc_request **request)
2126 {
2127         struct obd_device *obd = exp->exp_obd;
2128         struct lmv_obd *lmv = &obd->u.lmv;
2129         struct obd_export *tgt_exp;
2130         int rc;
2131         ENTRY;
2132
2133         rc = lmv_check_connect(obd);
2134         if (rc)
2135                 RETURN(rc);
2136
2137         tgt_exp = lmv_find_export(lmv, fid);
2138         if (IS_ERR(tgt_exp))
2139                 RETURN(PTR_ERR(tgt_exp));
2140
2141         rc = md_sync(tgt_exp, fid, oc, request);
2142         RETURN(rc);
2143 }
2144
2145 /* main purpose of LMV blocking ast is to remove split directory LMV
2146  * presentation object (struct lmv_obj) attached to the lock being revoked. */
2147 int lmv_blocking_ast(struct ldlm_lock *lock,
2148                      struct ldlm_lock_desc *desc,
2149                      void *data, int flag)
2150 {
2151         struct lustre_handle lockh;
2152         struct lmv_obj *obj;
2153         int rc;
2154         ENTRY;
2155
2156         switch (flag) {
2157         case LDLM_CB_BLOCKING:
2158                 ldlm_lock2handle(lock, &lockh);
2159                 rc = ldlm_cli_cancel(&lockh);
2160                 if (rc < 0) {
2161                         CDEBUG(D_INODE, "ldlm_cli_cancel: %d\n", rc);
2162                         RETURN(rc);
2163                 }
2164                 break;
2165         case LDLM_CB_CANCELING:
2166                 /* time to drop cached attrs for dirobj */
2167                 obj = lock->l_ast_data;
2168                 if (obj) {
2169                         CDEBUG(D_OTHER, "cancel %s on "LPU64"/"LPU64
2170                                ", master "DFID"\n",
2171                                lock->l_resource->lr_name.name[3] == 1 ?
2172                                "LOOKUP" : "UPDATE",
2173                                lock->l_resource->lr_name.name[0],
2174                                lock->l_resource->lr_name.name[1],
2175                                PFID(&obj->lo_fid));
2176                         lmv_obj_put(obj);
2177                 }
2178                 break;
2179         default:
2180                 LBUG();
2181         }
2182         RETURN(0);
2183 }
2184
2185 static void lmv_hash_adjust(__u32 *hash, __u32 hash_adj)
2186 {
2187         __u32 val;
2188
2189         val = le32_to_cpu(*hash);
2190         if (val < hash_adj)
2191                 val += MAX_HASH_SIZE;
2192         if (val != DIR_END_OFF)
2193                 *hash = cpu_to_le32(val - hash_adj);
2194 }
2195
2196 static __u32 lmv_node_rank(struct obd_export *exp, const struct lu_fid *fid)
2197 {
2198         __u64 id;
2199         struct obd_import *imp;
2200
2201         /*
2202          * XXX Hack: to get nid we assume that underlying obd device is mdc.
2203          */
2204         imp  = class_exp2cliimp(exp);
2205         id   = imp->imp_connection->c_self + fid_flatten(fid);
2206
2207         CDEBUG(D_INFO, "node rank: %llx "DFID" %llx %llx\n",
2208                imp->imp_connection->c_self, PFID(fid), id, id ^ (id >> 32));
2209
2210         return id ^ (id >> 32);
2211 }
2212
2213 static int lmv_readpage(struct obd_export *exp, const struct lu_fid *fid,
2214                         struct obd_capa *oc, __u64 offset64, struct page *page,
2215                         struct ptlrpc_request **request)
2216 {
2217         struct obd_device *obd = exp->exp_obd;
2218         struct lmv_obd *lmv = &obd->u.lmv;
2219         struct obd_export *tgt_exp;
2220         struct lu_fid rid = *fid;
2221         struct lmv_obj *obj;
2222         __u32 offset0;
2223         __u32 offset;
2224         __u32 hash_adj = 0;
2225         __u32 rank = 0;
2226         __u32 seg_size = 0;
2227         int tgt = 0;
2228         int tgt0 = 0;
2229         int rc;
2230         int nr = 0;
2231         ENTRY;
2232
2233         offset0 = offset = offset64;
2234         /*
2235          * Check that offset is representable by 32bit number.
2236          */
2237         LASSERT((__u64)offset == offset64);
2238
2239         rc = lmv_check_connect(obd);
2240         if (rc)
2241                 RETURN(rc);
2242
2243         CDEBUG(D_INFO, "READPAGE at %x from "DFID"\n", offset, PFID(&rid));
2244
2245         obj = lmv_obj_grab(obd, fid);
2246         if (obj) {
2247                 struct lmv_inode *loi;
2248
2249                 lmv_obj_lock(obj);
2250
2251                 nr       = obj->lo_objcount;
2252                 LASSERT(nr > 0);
2253                 seg_size = MAX_HASH_SIZE / nr;
2254                 loi      = obj->lo_inodes;
2255                 rank     = lmv_node_rank(lmv_get_export(lmv, loi[0].li_mds),
2256                                          fid) % nr;
2257                 tgt0     = (offset / seg_size) % nr;
2258                 tgt      = (tgt0 + rank) % nr;
2259
2260                 if (tgt < tgt0)
2261                         /*
2262                          * Wrap around.
2263                          *
2264                          * Last segment has unusual length due to division
2265                          * rounding.
2266                          */
2267                         hash_adj = MAX_HASH_SIZE - seg_size * nr;
2268                 else
2269                         hash_adj = 0;
2270
2271                 hash_adj += rank * seg_size;
2272
2273                 CDEBUG(D_INFO, "hash_adj: %x %x %x/%x -> %x/%x\n",
2274                        rank, hash_adj, offset, tgt0, offset + hash_adj, tgt);
2275
2276                 offset = (offset + hash_adj) % MAX_HASH_SIZE;
2277                 rid = obj->lo_inodes[tgt].li_fid;
2278                 tgt_exp = lmv_get_export(lmv, loi[tgt].li_mds);
2279
2280                 CDEBUG(D_INFO, "forward to "DFID" with offset %lu i %d\n",
2281                        PFID(&rid), (unsigned long)offset, tgt);
2282         } else
2283                 tgt_exp = lmv_find_export(lmv, &rid);
2284
2285         if (IS_ERR(tgt_exp))
2286                 GOTO(cleanup, rc = PTR_ERR(tgt_exp));
2287
2288         rc = md_readpage(tgt_exp, &rid, oc, offset, page, request);
2289         if (rc)
2290                 GOTO(cleanup, rc);
2291         if (obj) {
2292                 struct lu_dirpage *dp;
2293                 struct lu_dirent  *ent;
2294
2295                 dp = cfs_kmap(page);
2296
2297                 lmv_hash_adjust(&dp->ldp_hash_start, hash_adj);
2298                 lmv_hash_adjust(&dp->ldp_hash_end,   hash_adj);
2299                 LASSERT(cpu_to_le32(dp->ldp_hash_start) <= offset0);
2300
2301                 for (ent = lu_dirent_start(dp); ent != NULL;
2302                      ent = lu_dirent_next(ent))
2303                         lmv_hash_adjust(&ent->lde_hash, hash_adj);
2304
2305                 if (tgt0 != nr - 1) {
2306                         __u32 end;
2307
2308                         end = le32_to_cpu(dp->ldp_hash_end);
2309                         if (end == DIR_END_OFF) {
2310                                 dp->ldp_hash_end = cpu_to_le32(seg_size *
2311                                                                (tgt0 + 1));
2312                                 CDEBUG(D_INFO, ""DFID" reset end %x tgt %d\n",
2313                                        PFID(&rid),
2314                                        le32_to_cpu(dp->ldp_hash_end), tgt);
2315                         }
2316                 }
2317                 cfs_kunmap(page);
2318         }
2319         /*
2320          * Here we could remove "." and ".." from all pages which at not from
2321          * master. But MDS has only "." and ".." for master dir.
2322          */
2323         EXIT;
2324 cleanup:
2325         if (obj) {
2326                 lmv_obj_unlock(obj);
2327                 lmv_obj_put(obj);
2328         }
2329         return rc;
2330 }
2331
2332 static int lmv_unlink_slaves(struct obd_export *exp,
2333                              struct md_op_data *op_data,
2334                              struct ptlrpc_request **req)
2335 {
2336         struct obd_device *obd = exp->exp_obd;
2337         struct lmv_obd *lmv = &obd->u.lmv;
2338         struct lmv_stripe_md *mea = op_data->op_mea1;
2339         struct md_op_data *op_data2;
2340         struct obd_export *tgt_exp;
2341         int i, rc = 0;
2342         ENTRY;
2343
2344         OBD_ALLOC_PTR(op_data2);
2345         if (op_data2 == NULL)
2346                 RETURN(-ENOMEM);
2347
2348         op_data2->op_mode = S_IFDIR;
2349         op_data2->op_fsuid = current->fsuid;
2350         op_data2->op_fsgid = current->fsgid;
2351         op_data2->op_bias = 0;
2352
2353         LASSERT(mea != NULL);
2354         for (i = 0; i < mea->mea_count; i++) {
2355                 memset(op_data2, 0, sizeof(*op_data2));
2356                 op_data2->op_fid1 = mea->mea_ids[i];
2357                 tgt_exp = lmv_find_export(lmv, &op_data2->op_fid1);
2358                 if (IS_ERR(tgt_exp))
2359                         GOTO(out_free_op_data2, rc = PTR_ERR(tgt_exp));
2360
2361                 if (tgt_exp == NULL)
2362                         continue;
2363
2364                 rc = md_unlink(tgt_exp, op_data2, req);
2365
2366                 CDEBUG(D_OTHER, "unlink slave "DFID" -> %d\n",
2367                        PFID(&mea->mea_ids[i]), rc);
2368
2369                 if (*req) {
2370                         ptlrpc_req_finished(*req);
2371                         *req = NULL;
2372                 }
2373                 if (rc)
2374                         GOTO(out_free_op_data2, rc);
2375         }
2376
2377         EXIT;
2378 out_free_op_data2:
2379         OBD_FREE_PTR(op_data2);
2380         return rc;
2381 }
2382
2383 static int lmv_unlink(struct obd_export *exp, struct md_op_data *op_data,
2384                       struct ptlrpc_request **request)
2385 {
2386         struct obd_device *obd = exp->exp_obd;
2387         struct lmv_obd *lmv = &obd->u.lmv;
2388         struct obd_export *tgt_exp = NULL;
2389         struct lmv_obj *obj;
2390         int rc, loop = 0;
2391         ENTRY;
2392
2393         rc = lmv_check_connect(obd);
2394         if (rc)
2395                 RETURN(rc);
2396
2397         if (op_data->op_namelen == 0 && op_data->op_mea1 != NULL) {
2398                 /* mds asks to remove slave objects */
2399                 rc = lmv_unlink_slaves(exp, op_data, request);
2400                 RETURN(rc);
2401         }
2402
2403 repeat:
2404         ++loop;
2405         LASSERT(loop <= 2);
2406         if (op_data->op_namelen != 0) {
2407                 int mea_idx;
2408
2409                 obj = lmv_obj_grab(obd, &op_data->op_fid1);
2410                 if (obj) {
2411                         mea_idx = raw_name2idx(obj->lo_hashtype,
2412                                                obj->lo_objcount,
2413                                                op_data->op_name,
2414                                                op_data->op_namelen);
2415                         op_data->op_bias &= ~MDS_CHECK_SPLIT;
2416                         op_data->op_fid1 = obj->lo_inodes[mea_idx].li_fid;
2417                         tgt_exp = lmv_get_export(lmv,
2418                                                  obj->lo_inodes[mea_idx].li_mds);
2419                         lmv_obj_put(obj);
2420                         CDEBUG(D_OTHER, "unlink '%*s' in "DFID" -> %u\n",
2421                                op_data->op_namelen, op_data->op_name,
2422                                PFID(&op_data->op_fid1), mea_idx);
2423                 }
2424         } else {
2425                 CDEBUG(D_OTHER, "drop i_nlink on "DFID"\n",
2426                        PFID(&op_data->op_fid1));
2427         }
2428         if (tgt_exp == NULL) {
2429                 tgt_exp = lmv_find_export(lmv, &op_data->op_fid1);
2430                 if (IS_ERR(tgt_exp))
2431                         RETURN(PTR_ERR(tgt_exp));
2432                 op_data->op_bias |= MDS_CHECK_SPLIT;
2433         }
2434
2435         op_data->op_fsuid = current->fsuid;
2436         op_data->op_fsgid = current->fsgid;
2437         op_data->op_cap   = current->cap_effective;
2438
2439         /* If child's fid is given, cancel unused locks for it if it is from
2440          * another export than parent. */
2441         if (op_data->op_namelen) {
2442                 /* LOOKUP lock for child (fid3) should also be cancelled on 
2443                  * parent tgt_exp in mdc_unlink(). */
2444                 op_data->op_flags |= MF_MDC_CANCEL_FID1 | MF_MDC_CANCEL_FID3;
2445
2446                 /* XXX: the case when child is a striped dir is not supported.
2447                  * Only the master stripe has all locks cancelled early. */
2448                 /* Cancel FULL locks on child (fid3). */
2449                 rc = lmv_early_cancel(lmv, tgt_exp, NULL, op_data, LCK_EX,
2450                                       MDS_INODELOCK_FULL, MF_MDC_CANCEL_FID3);
2451         }
2452         if (rc == 0)
2453                 rc = md_unlink(tgt_exp, op_data, request);
2454         if (rc == -ERESTART) {
2455                 LASSERT(*request != NULL);
2456                 DEBUG_REQ(D_WARNING|D_RPCTRACE, *request,
2457                           "Got -ERESTART during unlink!\n");
2458                 ptlrpc_req_finished(*request);
2459                 *request = NULL;
2460
2461                 /*
2462                  * Directory got split. Time to update local object and repeat
2463                  * the request with proper MDS.
2464                  */
2465                 rc = lmv_handle_split(exp, &op_data->op_fid1);
2466                 if (rc == 0)
2467                         goto repeat;
2468         }
2469         RETURN(rc);
2470 }
2471
2472 static int lmv_llog_init(struct obd_device *obd, int group,
2473                          struct obd_device *tgt, int count,
2474                          struct llog_catid *logid, struct obd_uuid *uuid)
2475 {
2476 #if 0
2477         struct llog_ctxt *ctxt;
2478         int rc;
2479         ENTRY;
2480
2481         LASSERT(group == OBD_LLOG_GROUP);
2482         rc = llog_setup(obd, &obd->obd_olg, LLOG_CONFIG_REPL_CTXT, tgt, 0, NULL,
2483                         &llog_client_ops);
2484         if (rc == 0) {
2485                 ctxt = llog_group_get_ctxt(&obd->obd_olg, LLOG_CONFIG_REPL_CTXT);
2486                 llog_initiator_connect(ctxt, tgt);
2487                 llog_ctxt_put(ctxt);
2488         }
2489         RETURN(rc);
2490 #else
2491         return 0;
2492 #endif
2493 }
2494
2495 static int lmv_llog_finish(struct obd_device *obd, int count)
2496 {
2497         struct llog_ctxt *ctxt;
2498         int rc = 0;
2499         ENTRY;
2500
2501         ctxt = llog_get_context(obd, LLOG_CONFIG_REPL_CTXT);
2502         if (ctxt)
2503                 rc = llog_cleanup(ctxt);
2504
2505         RETURN(rc);
2506 }
2507
2508 static int lmv_precleanup(struct obd_device *obd, enum obd_cleanup_stage stage)
2509 {
2510         int rc = 0;
2511
2512         switch (stage) {
2513         case OBD_CLEANUP_EARLY:
2514                 /* XXX: here should be calling obd_precleanup() down to
2515                  * stack. */
2516                 break;
2517         case OBD_CLEANUP_SELF_EXP:
2518                 rc = obd_llog_finish(obd, 0);
2519                 if (rc != 0)
2520                         CERROR("failed to cleanup llogging subsystems\n");
2521                 break;
2522         default:
2523                 break;
2524         }
2525         RETURN(rc);
2526 }
2527
2528 static int lmv_get_info(struct obd_export *exp, __u32 keylen,
2529                         void *key, __u32 *vallen, void *val)
2530 {
2531         struct obd_device *obd;
2532         struct lmv_obd *lmv;
2533         int rc = 0;
2534         ENTRY;
2535
2536         obd = class_exp2obd(exp);
2537         if (obd == NULL) {
2538                 CDEBUG(D_IOCTL, "invalid client cookie "LPX64"\n",
2539                        exp->exp_handle.h_cookie);
2540                 RETURN(-EINVAL);
2541         }
2542
2543         lmv = &obd->u.lmv;
2544         if (keylen >= strlen("remote_flag") && !strcmp(key, "remote_flag")) {
2545                 struct lmv_tgt_desc *tgts;
2546                 int i;
2547
2548                 rc = lmv_check_connect(obd);
2549                 if (rc)
2550                         RETURN(rc);
2551
2552                 LASSERT(*vallen == sizeof(__u32));
2553                 for (i = 0, tgts = lmv->tgts; i < lmv->desc.ld_tgt_count;
2554                      i++, tgts++) {
2555
2556                         /* all tgts should be connected when this get called. */
2557                         if (!tgts || !tgts->ltd_exp) {
2558                                 CERROR("target not setup?\n");
2559                                 continue;
2560                         }
2561
2562                         if (!obd_get_info(tgts->ltd_exp, keylen, key,
2563                                           vallen, val))
2564                                 RETURN(0);
2565                 }
2566                 RETURN(-EINVAL);
2567         } else if (KEY_IS(KEY_MAX_EASIZE) || KEY_IS(KEY_CONN_DATA)) {
2568                 rc = lmv_check_connect(obd);
2569                 if (rc)
2570                         RETURN(rc);
2571
2572                 /* forwarding this request to first MDS, it should know LOV
2573                  * desc. */
2574                 rc = obd_get_info(lmv->tgts[0].ltd_exp, keylen, key,
2575                                   vallen, val);
2576                 if (!rc && KEY_IS(KEY_CONN_DATA)) {
2577                         exp->exp_connect_flags =
2578                         ((struct obd_connect_data *)val)->ocd_connect_flags;
2579                 }
2580                 RETURN(rc);
2581         }
2582
2583         CDEBUG(D_IOCTL, "invalid key\n");
2584         RETURN(-EINVAL);
2585 }
2586
2587 int lmv_set_info_async(struct obd_export *exp, obd_count keylen,
2588                        void *key, obd_count vallen, void *val,
2589                        struct ptlrpc_request_set *set)
2590 {
2591         struct lmv_tgt_desc    *tgt;
2592         struct obd_device      *obd;
2593         struct lmv_obd         *lmv;
2594         int rc = 0;
2595         ENTRY;
2596
2597         obd = class_exp2obd(exp);
2598         if (obd == NULL) {
2599                 CDEBUG(D_IOCTL, "invalid client cookie "LPX64"\n",
2600                        exp->exp_handle.h_cookie);
2601                 RETURN(-EINVAL);
2602         }
2603         lmv = &obd->u.lmv;
2604
2605         if (KEY_IS(KEY_READ_ONLY) || KEY_IS(KEY_FLUSH_CTX) ||
2606             KEY_IS(KEY_INIT_RECOV_BACKUP)) {
2607                 int i, err = 0;
2608
2609                 for (i = 0; i < lmv->desc.ld_tgt_count; i++) {
2610                         tgt = &lmv->tgts[i];
2611
2612                         if (!tgt->ltd_exp)
2613                                 continue;
2614
2615                         err = obd_set_info_async(tgt->ltd_exp,
2616                                                  keylen, key, vallen, val, set);
2617                         if (err && rc == 0)
2618                                 rc = err;
2619                 }
2620
2621                 RETURN(rc);
2622         }
2623
2624         RETURN(-EINVAL);
2625 }
2626
2627 int lmv_packmd(struct obd_export *exp, struct lov_mds_md **lmmp,
2628                struct lov_stripe_md *lsm)
2629 {
2630         struct obd_device *obd = class_exp2obd(exp);
2631         struct lmv_obd *lmv = &obd->u.lmv;
2632         struct lmv_stripe_md *meap, *lsmp;
2633         int mea_size, i;
2634         ENTRY;
2635
2636         mea_size = lmv_get_easize(lmv);
2637         if (!lmmp)
2638                 RETURN(mea_size);
2639
2640         if (*lmmp && !lsm) {
2641                 OBD_FREE(*lmmp, mea_size);
2642                 *lmmp = NULL;
2643                 RETURN(0);
2644         }
2645
2646         if (*lmmp == NULL) {
2647                 OBD_ALLOC(*lmmp, mea_size);
2648                 if (*lmmp == NULL)
2649                         RETURN(-ENOMEM);
2650         }
2651
2652         if (!lsm)
2653                 RETURN(mea_size);
2654
2655         lsmp = (struct lmv_stripe_md *)lsm;
2656         meap = (struct lmv_stripe_md *)*lmmp;
2657
2658         if (lsmp->mea_magic != MEA_MAGIC_LAST_CHAR &&
2659             lsmp->mea_magic != MEA_MAGIC_ALL_CHARS)
2660                 RETURN(-EINVAL);
2661
2662         meap->mea_magic = cpu_to_le32(lsmp->mea_magic);
2663         meap->mea_count = cpu_to_le32(lsmp->mea_count);
2664         meap->mea_master = cpu_to_le32(lsmp->mea_master);
2665
2666         for (i = 0; i < lmv->desc.ld_tgt_count; i++) {
2667                 meap->mea_ids[i] = meap->mea_ids[i];
2668                 fid_cpu_to_le(&meap->mea_ids[i], &meap->mea_ids[i]);
2669         }
2670
2671         RETURN(mea_size);
2672 }
2673
2674 int lmv_unpackmd(struct obd_export *exp, struct lov_stripe_md **lsmp,
2675                  struct lov_mds_md *lmm, int lmm_size)
2676 {
2677         struct obd_device *obd = class_exp2obd(exp);
2678         struct lmv_stripe_md **tmea = (struct lmv_stripe_md **)lsmp;
2679         struct lmv_stripe_md *mea = (struct lmv_stripe_md *)lmm;
2680         struct lmv_obd *lmv = &obd->u.lmv;
2681         int mea_size, i;
2682         __u32 magic;
2683         ENTRY;
2684
2685         mea_size = lmv_get_easize(lmv);
2686         if (lsmp == NULL)
2687                 return mea_size;
2688
2689         if (*lsmp != NULL && lmm == NULL) {
2690                 OBD_FREE(*tmea, mea_size);
2691                 *lsmp = NULL;
2692                 RETURN(0);
2693         }
2694
2695         LASSERT(mea_size == lmm_size);
2696
2697         OBD_ALLOC(*tmea, mea_size);
2698         if (*tmea == NULL)
2699                 RETURN(-ENOMEM);
2700
2701         if (!lmm)
2702                 RETURN(mea_size);
2703
2704         if (mea->mea_magic == MEA_MAGIC_LAST_CHAR ||
2705             mea->mea_magic == MEA_MAGIC_ALL_CHARS ||
2706             mea->mea_magic == MEA_MAGIC_HASH_SEGMENT)
2707         {
2708                 magic = le32_to_cpu(mea->mea_magic);
2709         } else {
2710                 /* old mea is not handled here */
2711                 LBUG();
2712         }
2713
2714         (*tmea)->mea_magic = magic;
2715         (*tmea)->mea_count = le32_to_cpu(mea->mea_count);
2716         (*tmea)->mea_master = le32_to_cpu(mea->mea_master);
2717
2718         for (i = 0; i < (*tmea)->mea_count; i++) {
2719                 (*tmea)->mea_ids[i] = mea->mea_ids[i];
2720                 fid_le_to_cpu(&(*tmea)->mea_ids[i], &(*tmea)->mea_ids[i]);
2721         }
2722         RETURN(mea_size);
2723 }
2724
2725 static int lmv_cancel_unused(struct obd_export *exp,
2726                              const struct lu_fid *fid,
2727                              ldlm_policy_data_t *policy,
2728                              ldlm_mode_t mode, int flags, void *opaque)
2729 {
2730         struct obd_device *obd = exp->exp_obd;
2731         struct lmv_obd *lmv = &obd->u.lmv;
2732         int rc = 0, err, i;
2733         ENTRY;
2734
2735         LASSERT(fid != NULL);
2736
2737         for (i = 0; i < lmv->desc.ld_tgt_count; i++) {
2738                 if (!lmv->tgts[i].ltd_exp || !lmv->tgts[i].ltd_active)
2739                         continue;
2740
2741                 err = md_cancel_unused(lmv->tgts[i].ltd_exp, fid,
2742                                        policy, mode, flags, opaque);
2743                 if (!rc)
2744                         rc = err;
2745         }
2746         RETURN(rc);
2747 }
2748
2749 int lmv_set_lock_data(struct obd_export *exp, __u64 *lockh, void *data)
2750 {
2751         struct obd_device *obd = exp->exp_obd;
2752         struct lmv_obd *lmv = &obd->u.lmv;
2753
2754         ENTRY;
2755         RETURN(md_set_lock_data(lmv->tgts[0].ltd_exp, lockh, data));
2756 }
2757
2758 ldlm_mode_t lmv_lock_match(struct obd_export *exp, int flags,
2759                            const struct lu_fid *fid, ldlm_type_t type,
2760                            ldlm_policy_data_t *policy, ldlm_mode_t mode,
2761                            struct lustre_handle *lockh)
2762 {
2763         struct obd_device *obd = exp->exp_obd;
2764         struct lmv_obd *lmv = &obd->u.lmv;
2765         ldlm_mode_t rc;
2766         int i;
2767         ENTRY;
2768
2769         CDEBUG(D_OTHER, "lock match for "DFID"\n", PFID(fid));
2770
2771         /* with CMD every object can have two locks in different namespaces:
2772          * lookup lock in space of mds storing direntry and update/open lock in
2773          * space of mds storing inode. Thus we check all targets, not only that
2774          * one fid was created in. */
2775         for (i = 0; i < lmv->desc.ld_tgt_count; i++) {
2776                 rc = md_lock_match(lmv->tgts[i].ltd_exp, flags, fid,
2777                                    type, policy, mode, lockh);
2778                 if (rc)
2779                         RETURN(rc);
2780         }
2781
2782         RETURN(0);
2783 }
2784
2785 int lmv_get_lustre_md(struct obd_export *exp, struct ptlrpc_request *req,
2786                       struct obd_export *dt_exp, struct obd_export *md_exp,
2787                       struct lustre_md *md)
2788 {
2789         struct obd_device *obd = exp->exp_obd;
2790         struct lmv_obd *lmv = &obd->u.lmv;
2791         int rc;
2792
2793         ENTRY;
2794         rc = md_get_lustre_md(lmv->tgts[0].ltd_exp, req, dt_exp, md_exp, md);
2795         RETURN(rc);
2796 }
2797
2798 int lmv_free_lustre_md(struct obd_export *exp, struct lustre_md *md)
2799 {
2800         struct obd_device *obd = exp->exp_obd;
2801         struct lmv_obd *lmv = &obd->u.lmv;
2802
2803         ENTRY;
2804         if (md->mea)
2805                 obd_free_memmd(exp, (struct lov_stripe_md**)&md->mea);
2806         RETURN(md_free_lustre_md(lmv->tgts[0].ltd_exp, md));
2807 }
2808
2809 int lmv_set_open_replay_data(struct obd_export *exp,
2810                              struct obd_client_handle *och,
2811                              struct ptlrpc_request *open_req)
2812 {
2813         struct obd_device *obd = exp->exp_obd;
2814         struct lmv_obd *lmv = &obd->u.lmv;
2815         struct obd_export *tgt_exp;
2816
2817         ENTRY;
2818
2819         tgt_exp = lmv_find_export(lmv, &och->och_fid);
2820         if (IS_ERR(tgt_exp))
2821                 RETURN(PTR_ERR(tgt_exp));
2822
2823         RETURN(md_set_open_replay_data(tgt_exp, och, open_req));
2824 }
2825
2826 int lmv_clear_open_replay_data(struct obd_export *exp,
2827                                struct obd_client_handle *och)
2828 {
2829         struct obd_device *obd = exp->exp_obd;
2830         struct lmv_obd *lmv = &obd->u.lmv;
2831         struct obd_export *tgt_exp;
2832         ENTRY;
2833
2834         tgt_exp = lmv_find_export(lmv, &och->och_fid);
2835         if (IS_ERR(tgt_exp))
2836                 RETURN(PTR_ERR(tgt_exp));
2837
2838         RETURN(md_clear_open_replay_data(tgt_exp, och));
2839 }
2840
2841 static int lmv_get_remote_perm(struct obd_export *exp,
2842                                const struct lu_fid *fid,
2843                                struct obd_capa *oc, __u32 suppgid,
2844                                struct ptlrpc_request **request)
2845 {
2846         struct obd_device *obd = exp->exp_obd;
2847         struct lmv_obd *lmv = &obd->u.lmv;
2848         struct obd_export *tgt_exp;
2849         int rc;
2850
2851         ENTRY;
2852
2853         rc = lmv_check_connect(obd);
2854         if (rc)
2855                 RETURN(rc);
2856
2857         tgt_exp = lmv_find_export(lmv, fid);
2858         if (IS_ERR(tgt_exp))
2859                 RETURN(PTR_ERR(tgt_exp));
2860
2861         rc = md_get_remote_perm(tgt_exp, fid, oc, suppgid, request);
2862
2863         RETURN(rc);
2864 }
2865
2866 static int lmv_renew_capa(struct obd_export *exp, struct obd_capa *oc,
2867                           renew_capa_cb_t cb)
2868 {
2869         struct obd_device *obd = exp->exp_obd;
2870         struct lmv_obd *lmv = &obd->u.lmv;
2871         struct obd_export *tgt_exp;
2872         int rc;
2873         ENTRY;
2874
2875         rc = lmv_check_connect(obd);
2876         if (rc)
2877                 RETURN(rc);
2878
2879         tgt_exp = lmv_find_export(lmv, &oc->c_capa.lc_fid);
2880         if (IS_ERR(tgt_exp))
2881                 RETURN(PTR_ERR(tgt_exp));
2882
2883         rc = md_renew_capa(tgt_exp, oc, cb);
2884         RETURN(rc);
2885 }
2886
2887 struct obd_ops lmv_obd_ops = {
2888         .o_owner                = THIS_MODULE,
2889         .o_setup                = lmv_setup,
2890         .o_cleanup              = lmv_cleanup,
2891         .o_precleanup           = lmv_precleanup,
2892         .o_process_config       = lmv_process_config,
2893         .o_connect              = lmv_connect,
2894         .o_disconnect           = lmv_disconnect,
2895         .o_statfs               = lmv_statfs,
2896         .o_llog_init            = lmv_llog_init,
2897         .o_llog_finish          = lmv_llog_finish,
2898         .o_get_info             = lmv_get_info,
2899         .o_set_info_async       = lmv_set_info_async,
2900         .o_packmd               = lmv_packmd,
2901         .o_unpackmd             = lmv_unpackmd,
2902         .o_notify               = lmv_notify,
2903         .o_iocontrol            = lmv_iocontrol,
2904         .o_fid_delete           = lmv_fid_delete
2905 };
2906
2907 struct md_ops lmv_md_ops = {
2908         .m_getstatus            = lmv_getstatus,
2909         .m_change_cbdata        = lmv_change_cbdata,
2910         .m_close                = lmv_close,
2911         .m_create               = lmv_create,
2912         .m_done_writing         = lmv_done_writing,
2913         .m_enqueue              = lmv_enqueue,
2914         .m_getattr              = lmv_getattr,
2915         .m_getxattr             = lmv_getxattr,
2916         .m_getattr_name         = lmv_getattr_name,
2917         .m_intent_lock          = lmv_intent_lock,
2918         .m_link                 = lmv_link,
2919         .m_rename               = lmv_rename,
2920         .m_setattr              = lmv_setattr,
2921         .m_setxattr             = lmv_setxattr,
2922         .m_sync                 = lmv_sync,
2923         .m_readpage             = lmv_readpage,
2924         .m_unlink               = lmv_unlink,
2925         .m_init_ea_size         = lmv_init_ea_size,
2926         .m_cancel_unused        = lmv_cancel_unused,
2927         .m_set_lock_data        = lmv_set_lock_data,
2928         .m_lock_match           = lmv_lock_match,
2929         .m_get_lustre_md        = lmv_get_lustre_md,
2930         .m_free_lustre_md       = lmv_free_lustre_md,
2931         .m_set_open_replay_data = lmv_set_open_replay_data,
2932         .m_clear_open_replay_data = lmv_clear_open_replay_data,
2933         .m_get_remote_perm      = lmv_get_remote_perm,
2934         .m_renew_capa           = lmv_renew_capa
2935 };
2936
2937 int __init lmv_init(void)
2938 {
2939         struct lprocfs_static_vars lvars;
2940         int rc;
2941
2942         obj_cache = cfs_mem_cache_create("lmv_objects",
2943                                       sizeof(struct lmv_obj),
2944                                       0, 0);
2945         if (!obj_cache) {
2946                 CERROR("error allocating lmv objects cache\n");
2947                 return -ENOMEM;
2948         }
2949
2950         lprocfs_lmv_init_vars(&lvars);
2951         rc = class_register_type(&lmv_obd_ops, &lmv_md_ops,
2952                                  lvars.module_vars, LUSTRE_LMV_NAME, NULL);
2953         if (rc)
2954                 cfs_mem_cache_destroy(obj_cache);
2955
2956         return rc;
2957 }
2958
2959 #ifdef __KERNEL__
2960 static void lmv_exit(void)
2961 {
2962         int rc;
2963
2964         class_unregister_type(LUSTRE_LMV_NAME);
2965
2966         rc = cfs_mem_cache_destroy(obj_cache);
2967         LASSERTF(rc == 0,
2968                  "can't free lmv objects cache, %d object(s)"
2969                  "still in use\n", atomic_read(&obj_cache_count));
2970 }
2971
2972 MODULE_AUTHOR("Cluster File Systems, Inc. <info@clusterfs.com>");
2973 MODULE_DESCRIPTION("Lustre Logical Metadata Volume OBD driver");
2974 MODULE_LICENSE("GPL");
2975
2976 module_init(lmv_init);
2977 module_exit(lmv_exit);
2978 #endif