Whamcloud - gitweb
- landing of b_fid after merge with b_hd_cleanup_merge.
[fs/lustre-release.git] / lustre / lmv / lmv_obd.c
1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2  * vim:expandtab:shiftwidth=8:tabstop=8:
3  *
4  * Copyright (C) 2002, 2003 Cluster File Systems, Inc.
5  *
6  *   This file is part of Lustre, http://www.lustre.org.
7  *
8  *   Lustre is free software; you can redistribute it and/or
9  *   modify it under the terms of version 2 of the GNU General Public
10  *   License as published by the Free Software Foundation.
11  *
12  *   Lustre is distributed in the hope that it will be useful,
13  *   but WITHOUT ANY WARRANTY; without even the implied warranty of
14  *   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
15  *   GNU General Public License for more details.
16  *
17  *   You should have received a copy of the GNU General Public License
18  *   along with Lustre; if not, write to the Free Software
19  *   Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA.
20  */
21
22 #ifndef EXPORT_SYMTAB
23 # define EXPORT_SYMTAB
24 #endif
25 #define DEBUG_SUBSYSTEM S_LMV
26 #ifdef __KERNEL__
27 #include <linux/slab.h>
28 #include <linux/module.h>
29 #include <linux/init.h>
30 #include <linux/slab.h>
31 #include <linux/pagemap.h>
32 #include <asm/div64.h>
33 #include <linux/seq_file.h>
34 #else
35 #include <liblustre.h>
36 #endif
37 #include <linux/ext2_fs.h>
38
39 #include <linux/obd_support.h>
40 #include <linux/lustre_lib.h>
41 #include <linux/lustre_net.h>
42 #include <linux/lustre_idl.h>
43 #include <linux/lustre_dlm.h>
44 #include <linux/lustre_mds.h>
45 #include <linux/obd_class.h>
46 #include <linux/obd_ost.h>
47 #include <linux/lprocfs_status.h>
48 #include <linux/lustre_fsfilt.h>
49 #include <linux/obd_lmv.h>
50 #include "lmv_internal.h"
51
52 static void lmv_activate_target(struct lmv_obd *lmv,
53                                 struct lmv_tgt_desc *tgt,
54                                 int activate)
55 {
56         if (tgt->active == activate)
57                 return;
58         
59         tgt->active = activate;
60         lmv->desc.ld_active_tgt_count += (activate ? 1 : -1);
61 }
62
63 /* Error codes:
64  *
65  *  -EINVAL  : UUID can't be found in the LMV's target list
66  *  -ENOTCONN: The UUID is found, but the target connection is bad (!)
67  *  -EBADF   : The UUID is found, but the OBD of the wrong type (!)
68  */
69 static int lmv_set_mdc_active(struct lmv_obd *lmv, struct obd_uuid *uuid,
70                               int activate)
71 {
72         struct obd_device *obd;
73         struct lmv_tgt_desc *tgt;
74         int i, rc = 0;
75         ENTRY;
76
77         CDEBUG(D_INFO, "Searching in lmv %p for uuid %s (activate=%d)\n",
78                lmv, uuid->uuid, activate);
79
80         spin_lock(&lmv->lmv_lock);
81         for (i = 0, tgt = lmv->tgts; i < lmv->desc.ld_tgt_count; i++, tgt++) {
82                 if (tgt->ltd_exp == NULL)
83                         continue;
84
85                 CDEBUG(D_INFO, "lmv idx %d is %s conn "LPX64"\n",
86                        i, tgt->uuid.uuid, tgt->ltd_exp->exp_handle.h_cookie);
87
88                 if (strncmp(uuid->uuid, tgt->uuid.uuid, sizeof uuid->uuid) == 0)
89                         break;
90         }
91
92         if (i == lmv->desc.ld_tgt_count)
93                 GOTO(out_lmv_lock, rc = -EINVAL);
94
95         obd = class_exp2obd(tgt->ltd_exp);
96         if (obd == NULL)
97                 GOTO(out_lmv_lock, rc = -ENOTCONN);
98
99         CDEBUG(D_INFO, "Found OBD %s=%s device %d (%p) type %s at LMV idx %d\n",
100                obd->obd_name, obd->obd_uuid.uuid, obd->obd_minor, obd,
101                obd->obd_type->typ_name, i);
102         LASSERT(strcmp(obd->obd_type->typ_name, LUSTRE_MDC_NAME) == 0);
103
104         if (tgt->active == activate) {
105                 CDEBUG(D_INFO, "OBD %p already %sactive!\n", obd,
106                        activate ? "" : "in");
107                 GOTO(out_lmv_lock, rc);
108         }
109
110         CDEBUG(D_INFO, "Marking OBD %p %sactive\n",
111                obd, activate ? "" : "in");
112
113         lmv_activate_target(lmv, tgt, activate);
114
115         EXIT;
116         
117  out_lmv_lock:
118         spin_unlock(&lmv->lmv_lock);
119         return rc;
120 }
121
122 static int lmv_notify(struct obd_device *obd, struct obd_device *watched,
123                       int active, void *data)
124 {
125         int rc;
126         struct obd_uuid *uuid;
127
128         if (strcmp(watched->obd_type->typ_name, LUSTRE_MDC_NAME)) {
129                 CERROR("unexpected notification of %s %s!\n",
130                        watched->obd_type->typ_name,
131                        watched->obd_name);
132                 return -EINVAL;
133         }
134         uuid = &watched->u.cli.cl_import->imp_target_uuid;
135
136         /* Set MDC as active before notifying the observer, so the observer can
137          * use the MDC normally.
138          */
139         rc = lmv_set_mdc_active(&obd->u.lmv, uuid, active);
140         if (rc) {
141                 CERROR("%sactivation of %s failed: %d\n",
142                        active ? "" : "de", uuid->uuid, rc);
143                 RETURN(rc);
144         }
145
146         if (obd->obd_observer)
147                 /* Pass the notification up the chain. */
148                 rc = obd_notify(obd->obd_observer, watched, active, data);
149
150         RETURN(rc);
151 }
152
153 int lmv_attach(struct obd_device *dev, obd_count len, void *data)
154 {
155         struct lprocfs_static_vars lvars;
156         int rc;
157         ENTRY;
158
159         lprocfs_init_vars(lmv, &lvars);
160         rc = lprocfs_obd_attach(dev, lvars.obd_vars);
161         if (rc == 0) {
162 #ifdef __KERNEL__
163                 struct proc_dir_entry *entry;
164                 
165                 entry = create_proc_entry("target_obd_status", 0444, 
166                                            dev->obd_proc_entry);
167                 if (entry == NULL)
168                         RETURN(-ENOMEM);
169                 entry->proc_fops = &lmv_proc_target_fops; 
170                 entry->data = dev;
171 #endif
172        }
173         RETURN (rc);
174 }
175
176 int lmv_detach(struct obd_device *dev)
177 {
178         return lprocfs_obd_detach(dev);
179 }
180
181 /* this is fake connect function. Its purpose is to initialize lmv and say
182  * caller that everything is okay. Real connection will be performed later. */
183 static int lmv_connect(struct lustre_handle *conn, struct obd_device *obd,
184                        struct obd_uuid *cluuid, unsigned long connect_flags)
185 {
186         struct lmv_obd *lmv = &obd->u.lmv;
187         struct obd_export *exp;
188         struct proc_dir_entry *lmv_proc_dir;
189         int rc;
190         ENTRY;
191
192         rc = class_connect(conn, obd, cluuid);
193         if (rc) {
194                 CERROR("class_connection() returned %d\n", rc);
195                 RETURN(rc);
196         }
197
198         exp = class_conn2export(conn);
199         
200         /* we don't want to actually do the underlying connections more than
201          * once, so keep track. */
202         lmv->refcount++;
203         if (lmv->refcount > 1) {
204                 class_export_put(exp);
205                 RETURN(0);
206         }
207
208         lmv->exp = exp;
209         lmv->connected = 0;
210         lmv->cluuid = *cluuid;
211         sema_init(&lmv->init_sem, 1);
212         lmv->connect_flags = connect_flags;
213
214         lmv_proc_dir = lprocfs_register("target_obds", obd->obd_proc_entry,
215                                         NULL, NULL);
216         if (IS_ERR(lmv_proc_dir)) {
217                 CERROR("could not register /proc/fs/lustre/%s/%s/target_obds.",
218                        obd->obd_type->typ_name, obd->obd_name);
219                 lmv_proc_dir = NULL;
220         }
221
222
223         RETURN(0);
224 }
225
226 void lmv_set_timeouts(struct obd_device *obd)
227 {
228         struct lmv_tgt_desc *tgts;
229         struct lmv_obd *lmv;
230         int i;
231
232         lmv = &obd->u.lmv;
233         if (lmv->server_timeout == 0)
234                 return;
235
236         if (lmv->connected == 0)
237                 return;
238
239         for (i = 0, tgts = lmv->tgts; i < lmv->desc.ld_tgt_count; i++, tgts++) {
240                 if (tgts->ltd_exp == NULL)
241                         continue;
242                 
243                 obd_set_info(tgts->ltd_exp, strlen("inter_mds"),
244                              "inter_mds", 0, NULL);
245         }
246 }
247
248 #define MAX_STRING_SIZE 128
249
250 /* performs a check if passed obd is connected. If no - connect it. */
251 int lmv_check_connect(struct obd_device *obd)
252 {
253 #ifdef __KERNEL__
254         struct proc_dir_entry *lmv_proc_dir;
255 #endif
256         struct lmv_obd *lmv = &obd->u.lmv;
257         struct lmv_tgt_desc *tgts;
258         struct obd_uuid *cluuid;
259         struct obd_export *exp;
260         int rc, rc2, i;
261
262         if (lmv->connected)
263                 return 0;
264         
265         down(&lmv->init_sem);
266         if (lmv->connected) {
267                 up(&lmv->init_sem);
268                 return 0;
269         }
270
271         cluuid = &lmv->cluuid;
272         exp = lmv->exp;
273         
274         CDEBUG(D_OTHER, "time to connect %s to %s\n",
275                cluuid->uuid, obd->obd_name);
276
277         for (i = 0, tgts = lmv->tgts; i < lmv->desc.ld_tgt_count; i++, tgts++) {
278                 struct obd_device *tgt_obd;
279                 struct lustre_handle conn = {0, };
280                 struct obd_uuid lmv_mdc_uuid = { "LMV_MDC_UUID" };
281
282                 LASSERT(tgts != NULL);
283
284                 tgt_obd = class_find_client_obd(&tgts->uuid, LUSTRE_MDC_NAME, 
285                                                 &obd->obd_uuid);
286                 if (!tgt_obd) {
287                         CERROR("Target %s not attached\n", tgts->uuid.uuid);
288                         GOTO(out_disc, rc = -EINVAL);
289                 }
290
291                 /* for MDS: don't connect to yourself */
292                 if (obd_uuid_equals(&tgts->uuid, cluuid)) {
293                         CDEBUG(D_OTHER, "don't connect back to %s\n",
294                                cluuid->uuid);
295                         tgts->ltd_exp = NULL;
296                         continue;
297                 }
298
299                 CDEBUG(D_OTHER, "connect to %s(%s) - %s, %s FOR %s\n",
300                         tgt_obd->obd_name, tgt_obd->obd_uuid.uuid,
301                         tgts->uuid.uuid, obd->obd_uuid.uuid,
302                         cluuid->uuid);
303
304                 if (!tgt_obd->obd_set_up) {
305                         CERROR("Target %s not set up\n", tgts->uuid.uuid);
306                         GOTO(out_disc, rc = -EINVAL);
307                 }
308                 
309                 rc = obd_connect(&conn, tgt_obd, &lmv_mdc_uuid,
310                                  lmv->connect_flags);
311                 if (rc) {
312                         CERROR("Target %s connect error %d\n",
313                                 tgts->uuid.uuid, rc);
314                         GOTO(out_disc, rc);
315                 }
316                 tgts->ltd_exp = class_conn2export(&conn);
317
318                 obd_init_ea_size(tgts->ltd_exp, lmv->max_easize,
319                                  lmv->max_cookiesize);
320
321                 rc = obd_register_observer(tgt_obd, obd);
322                 if (rc) {
323                         CERROR("Target %s register_observer error %d\n",
324                                tgts->uuid.uuid, rc);
325                         obd_disconnect(tgts->ltd_exp, 0);
326                         GOTO(out_disc, rc);
327                 }
328
329                 lmv->desc.ld_active_tgt_count++;
330                 tgts->active = 1;
331
332                 CDEBUG(D_OTHER, "connected to %s(%s) successfully (%d)\n",
333                         tgt_obd->obd_name, tgt_obd->obd_uuid.uuid,
334                         atomic_read(&obd->obd_refcount));
335
336 #ifdef __KERNEL__
337                 lmv_proc_dir = lprocfs_srch(obd->obd_proc_entry, "target_obds");
338                 if (lmv_proc_dir) {
339                         struct obd_device *mdc_obd = class_conn2obd(&conn);
340                         struct proc_dir_entry *mdc_symlink;
341                         char name[MAX_STRING_SIZE + 1];
342
343                         LASSERT(mdc_obd != NULL);
344                         LASSERT(mdc_obd->obd_type != NULL);
345                         LASSERT(mdc_obd->obd_type->typ_name != NULL);
346                         name[MAX_STRING_SIZE] = '\0';
347                         snprintf(name, MAX_STRING_SIZE, "../../../%s/%s",
348                                  mdc_obd->obd_type->typ_name,
349                                  mdc_obd->obd_name);
350                         mdc_symlink = proc_symlink(mdc_obd->obd_name,
351                                                    lmv_proc_dir, name);
352                         if (mdc_symlink == NULL) {
353                                 CERROR("could not register LMV target "
354                                        "/proc/fs/lustre/%s/%s/target_obds/%s.",
355                                        obd->obd_type->typ_name, obd->obd_name,
356                                        mdc_obd->obd_name);
357                                 lprocfs_remove(lmv_proc_dir);
358                                 lmv_proc_dir = NULL;
359                         }
360                 }
361 #endif
362         }
363
364         lmv_set_timeouts(obd);
365         class_export_put(exp);
366         lmv->connected = 1;
367         up(&lmv->init_sem);
368         return 0;
369
370  out_disc:
371         while (i-- > 0) {
372                 struct obd_uuid uuid;
373                 --tgts;
374                 --lmv->desc.ld_active_tgt_count;
375                 tgts->active = 0;
376                 /* save for CERROR below; (we know it's terminated) */
377                 uuid = tgts->uuid;
378                 rc2 = obd_disconnect(tgts->ltd_exp, 0);
379                 if (rc2)
380                         CERROR("error: LMV target %s disconnect on MDC idx %d: "
381                                "error %d\n", uuid.uuid, i, rc2);
382         }
383         class_disconnect(exp, 0);
384         up(&lmv->init_sem);
385         RETURN (rc);
386 }
387
388 static int lmv_disconnect(struct obd_export *exp, int flags)
389 {
390         struct obd_device *obd = class_exp2obd(exp);
391         struct lmv_obd *lmv = &obd->u.lmv;
392
393 #ifdef __KERNEL__
394         struct proc_dir_entry *lmv_proc_dir;
395 #endif
396         int rc, i;
397         ENTRY;
398
399         if (!lmv->tgts)
400                 goto out_local;
401
402         /* Only disconnect the underlying layers on the final disconnect. */
403         lmv->refcount--;
404         if (lmv->refcount != 0)
405                 goto out_local;
406
407 #ifdef __KERNEL__
408         lmv_proc_dir = lprocfs_srch(obd->obd_proc_entry, "target_obds");
409 #endif
410
411         for (i = 0; i < lmv->desc.ld_tgt_count; i++) {
412                 struct obd_device *mdc_obd; 
413                 
414                 if (lmv->tgts[i].ltd_exp == NULL)
415                         continue;
416
417                 mdc_obd = class_exp2obd(lmv->tgts[i].ltd_exp);
418
419 #ifdef __KERNEL__
420                 if (lmv_proc_dir) {
421                         struct proc_dir_entry *mdc_symlink;
422
423                         mdc_symlink = lprocfs_srch(lmv_proc_dir, mdc_obd->obd_name);
424                         if (mdc_symlink) {
425                                 lprocfs_remove(mdc_symlink);
426                         } else {
427                                 CERROR("/proc/fs/lustre/%s/%s/target_obds/%s missing\n",
428                                        obd->obd_type->typ_name, obd->obd_name,
429                                        mdc_obd->obd_name);
430                         }
431                 }
432 #endif
433                 if (obd->obd_no_recov) {
434                         if (mdc_obd)
435                                 mdc_obd->obd_no_recov = 1;
436                 }
437                 CDEBUG(D_OTHER, "disconnected from %s(%s) successfully\n",
438                         lmv->tgts[i].ltd_exp->exp_obd->obd_name,
439                         lmv->tgts[i].ltd_exp->exp_obd->obd_uuid.uuid);
440
441                 obd_register_observer(lmv->tgts[i].ltd_exp->exp_obd, NULL);
442                 rc = obd_disconnect(lmv->tgts[i].ltd_exp, flags);
443                 if (rc) {
444                         if (lmv->tgts[i].active) {
445                                 CERROR("Target %s disconnect error %d\n",
446                                        lmv->tgts[i].uuid.uuid, rc);
447                         }
448                         rc = 0;
449                 }
450                 
451                 lmv_activate_target(lmv, &lmv->tgts[i], 0);
452                 lmv->tgts[i].ltd_exp = NULL;
453         }
454
455 #ifdef __KERNEL__
456         if (lmv_proc_dir) {
457                 lprocfs_remove(lmv_proc_dir);
458         } else {
459                 CERROR("/proc/fs/lustre/%s/%s/target_obds missing\n",
460                        obd->obd_type->typ_name, obd->obd_name);
461         }
462 #endif
463
464 out_local:
465         /* this is the case when no real connection is established by
466          * lmv_check_connect(). */
467         if (!lmv->connected)
468                 class_export_put(exp);
469         rc = class_disconnect(exp, 0);
470         if (lmv->refcount == 0)
471                 lmv->connected = 0;
472         RETURN(rc);
473 }
474
475 static int lmv_iocontrol(unsigned int cmd, struct obd_export *exp,
476                          int len, void *karg, void *uarg)
477 {
478         struct obd_device *obddev = class_exp2obd(exp);
479         struct lmv_obd *lmv = &obddev->u.lmv;
480         int i, rc = 0, set = 0;
481
482         ENTRY;
483
484         if (lmv->desc.ld_tgt_count == 0)
485                 RETURN(-ENOTTY);
486         
487         for (i = 0; i < lmv->desc.ld_tgt_count; i++) {
488                 int err;
489
490                 if (lmv->tgts[i].ltd_exp == NULL)
491                         continue;
492
493                 err = obd_iocontrol(cmd, lmv->tgts[i].ltd_exp, len, karg, uarg);
494                 if (err) {
495                         if (lmv->tgts[i].active) {
496                                 CERROR("error: iocontrol MDC %s on MDT"
497                                        "idx %d: err = %d\n",
498                                        lmv->tgts[i].uuid.uuid, i, err);
499                                 if (!rc)
500                                         rc = err;
501                         }
502                 } else
503                         set = 1;
504         }
505         if (!set && !rc)
506                 rc = -EIO;
507
508         RETURN(rc);
509 }
510
511 static int lmv_setup(struct obd_device *obd, obd_count len, void *buf)
512 {
513         int i, rc = 0;
514         struct lmv_desc *desc;
515         struct obd_uuid *uuids;
516         struct lmv_tgt_desc *tgts;
517         struct obd_device *tgt_obd;
518         struct lustre_cfg *lcfg = buf;
519         struct lmv_obd *lmv = &obd->u.lmv;
520         ENTRY;
521
522         if (lcfg->lcfg_inllen1 < 1) {
523                 CERROR("LMV setup requires a descriptor\n");
524                 RETURN(-EINVAL);
525         }
526
527         if (lcfg->lcfg_inllen2 < 1) {
528                 CERROR("LMV setup requires an OST UUID list\n");
529                 RETURN(-EINVAL);
530         }
531
532         desc = (struct lmv_desc *)lcfg->lcfg_inlbuf1;
533         if (sizeof(*desc) > lcfg->lcfg_inllen1) {
534                 CERROR("descriptor size wrong: %d > %d\n",
535                        (int)sizeof(*desc), lcfg->lcfg_inllen1);
536                 RETURN(-EINVAL);
537         }
538
539         uuids = (struct obd_uuid *)lcfg->lcfg_inlbuf2;
540         if (sizeof(*uuids) * desc->ld_tgt_count != lcfg->lcfg_inllen2) {
541                 CERROR("UUID array size wrong: %u * %u != %u\n",
542                        sizeof(*uuids), desc->ld_tgt_count, lcfg->lcfg_inllen2);
543                 RETURN(-EINVAL);
544         }
545
546         lmv->tgts_size = sizeof(struct lmv_tgt_desc) * desc->ld_tgt_count;
547         OBD_ALLOC(lmv->tgts, lmv->tgts_size);
548         if (lmv->tgts == NULL) {
549                 CERROR("Out of memory\n");
550                 RETURN(-ENOMEM);
551         }
552
553         lmv->desc = *desc;
554         spin_lock_init(&lmv->lmv_lock);
555         
556         for (i = 0, tgts = lmv->tgts; i < desc->ld_tgt_count; i++, tgts++)
557                 tgts->uuid = uuids[i];
558         
559         lmv->max_cookiesize = 0;
560
561         lmv->max_easize = sizeof(struct lustre_id) *
562                 desc->ld_tgt_count + sizeof(struct mea);
563         
564         rc = lmv_setup_mgr(obd);
565         if (rc) {
566                 CERROR("Can't setup LMV object manager, "
567                        "error %d.\n", rc);
568                 OBD_FREE(lmv->tgts, lmv->tgts_size);
569         }
570
571         tgt_obd = class_find_client_obd(&lmv->tgts->uuid, LUSTRE_MDC_NAME, 
572                                         &obd->obd_uuid);
573         if (!tgt_obd) {
574                 CERROR("Target %s not attached\n", lmv->tgts->uuid.uuid);
575                 RETURN(-EINVAL);
576         }
577
578         RETURN(rc);
579 }
580
581 static int lmv_cleanup(struct obd_device *obd, int flags) 
582 {
583         struct lmv_obd *lmv = &obd->u.lmv;
584         ENTRY;
585
586         lmv_cleanup_mgr(obd);
587         OBD_FREE(lmv->tgts, lmv->tgts_size);
588         
589         RETURN(0);
590 }
591
592 static int lmv_statfs(struct obd_device *obd, struct obd_statfs *osfs,
593                       unsigned long max_age)
594 {
595         struct lmv_obd *lmv = &obd->u.lmv;
596         struct obd_statfs temp;
597         int rc = 0, i;
598         ENTRY;
599         
600         rc = lmv_check_connect(obd);
601         if (rc)
602                 RETURN(rc);
603                 
604         for (i = 0; i < lmv->desc.ld_tgt_count; i++) {
605                 if (lmv->tgts[i].ltd_exp == NULL) {
606                         CWARN("%s: NULL export for %d\n", obd->obd_name, i);
607                         continue;
608                 }
609
610                 rc = obd_statfs(lmv->tgts[i].ltd_exp->exp_obd, &temp, max_age);
611                 if (rc) {
612                         CERROR("can't stat MDS #%d (%s)\n", i,
613                                lmv->tgts[i].ltd_exp->exp_obd->obd_name);
614                         RETURN(rc);
615                 }
616                 if (i == 0) {
617                         memcpy(osfs, &temp, sizeof(temp));
618                 } else {
619                         osfs->os_bavail += temp.os_bavail;
620                         osfs->os_blocks += temp.os_blocks;
621                         osfs->os_ffree += temp.os_ffree;
622                         osfs->os_files += temp.os_files;
623                 }
624         }
625         RETURN(rc);
626 }
627
628 static int lmv_getstatus(struct obd_export *exp, struct lustre_id *id)
629 {
630         struct obd_device *obd = exp->exp_obd;
631         struct lmv_obd *lmv = &obd->u.lmv;
632         int rc;
633         ENTRY;
634
635         rc = lmv_check_connect(obd);
636         if (rc)
637                 RETURN(rc);
638
639         rc = md_getstatus(lmv->tgts[0].ltd_exp, id);
640         id_group(id) = 0;
641         
642         RETURN(rc);
643 }
644
645 static int lmv_getattr(struct obd_export *exp, struct lustre_id *id,
646                        unsigned long valid, unsigned int ea_size,
647                        struct ptlrpc_request **request)
648 {
649         struct obd_device *obd = exp->exp_obd;
650         struct lmv_obd *lmv = &obd->u.lmv;
651         int rc, i = id_group(id);
652         struct lmv_obj *obj;
653         ENTRY;
654
655         rc = lmv_check_connect(obd);
656         if (rc)
657                 RETURN(rc);
658
659         LASSERT(i < lmv->desc.ld_tgt_count);
660
661         rc = md_getattr(lmv->tgts[i].ltd_exp, id, valid,
662                         ea_size, request);
663         if (rc)
664                 RETURN(rc);
665         
666         obj = lmv_grab_obj(obd, id);
667         
668         CDEBUG(D_OTHER, "GETATTR for "DLID4" %s\n",
669                OLID4(id), obj ? "(splitted)" : "");
670
671         /* if object is splitted, then we loop over all the slaves and gather
672          * size attribute. In ideal world we would have to gather also mds field
673          * from all slaves, as object is spread over the cluster and this is
674          * definitely interesting information and it is not good to loss it,
675          * but...*/
676         if (obj) {
677                 struct mds_body *body;
678
679                 if (*request == NULL) {
680                         lmv_put_obj(obj);
681                         RETURN(rc);
682                 }
683                         
684                 body = lustre_msg_buf((*request)->rq_repmsg, 0,
685                                       sizeof(*body));
686                 LASSERT(body != NULL);
687
688                 lmv_lock_obj(obj);
689         
690                 for (i = 0; i < obj->objcount; i++) {
691
692                         if (lmv->tgts[i].ltd_exp == NULL) {
693                                 CWARN("%s: NULL export for %d\n",
694                                       obd->obd_name, i);
695                                 continue;
696                         }
697
698                         /* skip master obj. */
699                         if (lmv_id_equal(&obj->id, &obj->objs[i].id))
700                                 continue;
701                         
702                         body->size += obj->objs[i].size;
703                 }
704
705                 lmv_unlock_obj(obj);
706                 lmv_put_obj(obj);
707         }
708         
709         RETURN(rc);
710 }
711
712 static int lmv_change_cbdata(struct obd_export *exp,
713                              struct lustre_id *id, 
714                              ldlm_iterator_t it,
715                              void *data)
716 {
717         struct obd_device *obd = exp->exp_obd;
718         struct lmv_obd *lmv = &obd->u.lmv;
719         int rc = 0;
720         ENTRY;
721         
722         rc = lmv_check_connect(obd);
723         if (rc)
724                 RETURN(rc);
725         
726         CDEBUG(D_OTHER, "CBDATA for "DLID4"\n", OLID4(id));
727         LASSERT(id_group(id) < lmv->desc.ld_tgt_count);
728
729         rc = md_change_cbdata(lmv->tgts[id_group(id)].ltd_exp,
730                               id, it, data);
731         
732         RETURN(rc);
733 }
734
735 static int lmv_change_cbdata_name(struct obd_export *exp,
736                                   struct lustre_id *pid,
737                                   char *name, int len,
738                                   struct lustre_id *cid,
739                                   ldlm_iterator_t it, void *data)
740 {
741         struct obd_device *obd = exp->exp_obd;
742         struct lmv_obd *lmv = &obd->u.lmv;
743         struct lmv_obj *obj;
744         int rc = 0, mds;
745         ENTRY;
746
747         rc = lmv_check_connect(obd);
748         if (rc)
749                 RETURN(rc);
750
751         LASSERT(id_group(pid) < lmv->desc.ld_tgt_count);
752         LASSERT(id_group(cid) < lmv->desc.ld_tgt_count);
753         
754         CDEBUG(D_OTHER, "CBDATA for "DLID4":%*s -> "DLID4"\n",
755                OLID4(pid), len, name, OLID4(cid));
756
757         /* this is default mds for directory name belongs to. */
758         mds = id_group(pid);
759         obj = lmv_grab_obj(obd, pid);
760         if (obj) {
761                 /* directory is splitted. look for right mds for this name. */
762                 mds = raw_name2idx(obj->hashtype, obj->objcount, name, len);
763                 mds = id_group(&obj->objs[mds].id);
764                 lmv_put_obj(obj);
765         }
766         rc = md_change_cbdata(lmv->tgts[mds].ltd_exp, cid, it, data);
767         RETURN(rc);
768 }
769
770 static int lmv_valid_attrs(struct obd_export *exp, struct lustre_id *id) 
771 {
772         struct obd_device *obd = exp->exp_obd;
773         struct lmv_obd *lmv = &obd->u.lmv;
774         int rc = 0;
775         ENTRY;
776
777         rc = lmv_check_connect(obd);
778         if (rc)
779                 RETURN(rc);
780
781         CDEBUG(D_OTHER, "validate "DLID4"\n", OLID4(id));
782         LASSERT(id_group(id) < lmv->desc.ld_tgt_count);
783         rc = md_valid_attrs(lmv->tgts[id_group(id)].ltd_exp, id);
784         RETURN(rc);
785 }
786
787 int lmv_close(struct obd_export *exp, struct obdo *obdo,
788                   struct obd_client_handle *och,
789                   struct ptlrpc_request **request)
790 {
791         struct obd_device *obd = exp->exp_obd;
792         struct lmv_obd *lmv = &obd->u.lmv;
793         int rc, i = obdo->o_mds;
794         ENTRY;
795         rc = lmv_check_connect(obd);
796         if (rc)
797                 RETURN(rc);
798         LASSERT(i < lmv->desc.ld_tgt_count);
799         CDEBUG(D_OTHER, "CLOSE %lu/%lu/%lu\n", (unsigned long)obdo->o_mds,
800                (unsigned long)obdo->o_id, (unsigned long)obdo->o_generation);
801         rc = md_close(lmv->tgts[i].ltd_exp, obdo, och, request);
802         RETURN(rc);
803 }
804
805 int lmv_get_mea_and_update_object(struct obd_export *exp, 
806                                   struct lustre_id *id)
807 {
808         struct obd_device *obd = exp->exp_obd;
809         struct lmv_obd *lmv = &obd->u.lmv;
810         struct ptlrpc_request *req = NULL;
811         struct lmv_obj *obj;
812         struct lustre_md md;
813         unsigned long valid;
814         int mealen, rc;
815
816         md.mea = NULL;
817         mealen = MEA_SIZE_LMV(lmv);
818         
819         valid = OBD_MD_FLEASIZE | OBD_MD_FLDIREA;
820
821         /* time to update mea of parent id */
822         rc = md_getattr(lmv->tgts[id_group(id)].ltd_exp,
823                         id, valid, mealen, &req);
824         if (rc) {
825                 CERROR("md_getattr() failed, error %d\n", rc);
826                 GOTO(cleanup, rc);
827         }
828
829         rc = mdc_req2lustre_md(exp, req, 0, NULL, &md);
830         if (rc) {
831                 CERROR("mdc_req2lustre_md() failed, error %d\n", rc);
832                 GOTO(cleanup, rc);
833         }
834
835         if (md.mea == NULL)
836                 GOTO(cleanup, rc = -ENODATA);
837
838         obj = lmv_create_obj(exp, id, md.mea);
839         if (IS_ERR(obj))
840                 rc = PTR_ERR(obj);
841         
842         lmv_put_obj(obj);
843         obd_free_memmd(exp, (struct lov_stripe_md **)&md.mea);
844
845 cleanup:
846         if (req)
847                 ptlrpc_req_finished(req);
848         RETURN(rc);
849 }
850
851 int lmv_create(struct obd_export *exp, struct mdc_op_data *op_data,
852                const void *data, int datalen, int mode, __u32 uid,
853                __u32 gid, __u64 rdev, struct ptlrpc_request **request)
854 {
855         struct obd_device *obd = exp->exp_obd;
856         struct lmv_obd *lmv = &obd->u.lmv;
857         struct mds_body *body;
858         struct lmv_obj *obj;
859         int rc, mds, loop = 0;
860         ENTRY;
861
862         rc = lmv_check_connect(obd);
863         if (rc)
864                 RETURN(rc);
865
866         if (!lmv->desc.ld_active_tgt_count)
867                 RETURN(-EIO);
868 repeat:
869         LASSERT(++loop <= 2);
870         obj = lmv_grab_obj(obd, &op_data->id1);
871         if (obj) {
872                 mds = raw_name2idx(obj->hashtype, obj->objcount, 
873                                    op_data->name, op_data->namelen);
874                 op_data->id1 = obj->objs[mds].id;
875                 lmv_put_obj(obj);
876         }
877
878         CDEBUG(D_OTHER, "CREATE '%*s' on "DLID4"\n", op_data->namelen,
879                op_data->name, OLID4(&op_data->id1));
880         
881         rc = md_create(lmv->tgts[id_group(&op_data->id1)].ltd_exp, 
882                        op_data, data, datalen, mode, uid, gid, rdev, request);
883         if (rc == 0) {
884                 if (*request == NULL)
885                         RETURN(rc);
886
887                 body = lustre_msg_buf((*request)->rq_repmsg, 0,
888                                       sizeof(*body));
889                 LASSERT(body != NULL);
890                 
891                 CDEBUG(D_OTHER, "created. "DLID4"\n", OLID4(&op_data->id1));
892                 
893 /*                LASSERT(body->valid & OBD_MD_MDS ||
894                         body->mds == id_group(&op_data->id1));*/
895         } else if (rc == -ERESTART) {
896                 /* directory got splitted. time to update local object and
897                  * repeat the request with proper MDS */
898                 rc = lmv_get_mea_and_update_object(exp, &op_data->id1);
899                 if (rc == 0) {
900                         ptlrpc_req_finished(*request);
901                         goto repeat;
902                 }
903         }
904         RETURN(rc);
905 }
906
907 int lmv_done_writing(struct obd_export *exp, struct obdo *obdo)
908 {
909         struct obd_device *obd = exp->exp_obd;
910         struct lmv_obd *lmv = &obd->u.lmv;
911         int rc;
912         ENTRY;
913         rc = lmv_check_connect(obd);
914         if (rc)
915                 RETURN(rc);
916
917         /* FIXME: choose right MDC here */
918         CWARN("this method isn't implemented yet\n");
919         rc = md_done_writing(lmv->tgts[0].ltd_exp, obdo);
920         RETURN(rc);
921 }
922
923 int lmv_enqueue_slaves(struct obd_export *exp, int locktype,
924                        struct lookup_intent *it, int lockmode,
925                        struct mdc_op_data *data, struct lustre_handle *lockh,
926                        void *lmm, int lmmsize, ldlm_completion_callback cb_compl,
927                        ldlm_blocking_callback cb_blocking, void *cb_data)
928 {
929         struct obd_device *obd = exp->exp_obd;
930         struct lmv_obd *lmv = &obd->u.lmv;
931         struct mea *mea = data->mea1;
932         struct mdc_op_data data2;
933         int i, rc, mds;
934         ENTRY;
935
936         LASSERT(mea != NULL);
937         for (i = 0; i < mea->mea_count; i++) {
938                 memset(&data2, 0, sizeof(data2));
939                 data2.id1 = mea->mea_ids[i];
940                 mds = id_group(&data2.id1);
941                 
942                 if (lmv->tgts[mds].ltd_exp == NULL)
943                         continue;
944
945                 rc = md_enqueue(lmv->tgts[mds].ltd_exp, locktype, it, 
946                                 lockmode, &data2, lockh + i, lmm, lmmsize, 
947                                 cb_compl, cb_blocking, cb_data);
948                 
949                 CDEBUG(D_OTHER, "take lock on slave "DLID4" -> %d/%d\n",
950                        OLID4(&mea->mea_ids[i]), rc, it->d.lustre.it_status);
951                 if (rc)
952                         GOTO(cleanup, rc);
953                 if (it->d.lustre.it_data) {
954                         struct ptlrpc_request *req;
955                         req = (struct ptlrpc_request *) it->d.lustre.it_data;
956                         ptlrpc_req_finished(req);
957                 }
958                 
959                 if (it->d.lustre.it_status)
960                         GOTO(cleanup, rc = it->d.lustre.it_status);
961         }
962         RETURN(0);
963         
964 cleanup:
965         /* drop all taken locks */
966         while (--i >= 0) {
967                 if (lockh[i].cookie)
968                         ldlm_lock_decref(lockh + i, lockmode);
969                 lockh[i].cookie = 0;
970         }
971         RETURN(rc);
972 }
973
974 int lmv_enqueue(struct obd_export *exp, int lock_type,
975                 struct lookup_intent *it, int lock_mode,
976                 struct mdc_op_data *data, struct lustre_handle *lockh,
977                 void *lmm, int lmmsize, ldlm_completion_callback cb_compl,
978                 ldlm_blocking_callback cb_blocking, void *cb_data)
979 {
980         struct obd_device *obd = exp->exp_obd;
981         struct lmv_obd *lmv = &obd->u.lmv;
982         struct lmv_obj *obj;
983         int rc, mds;
984         ENTRY;
985
986         rc = lmv_check_connect(obd);
987         if (rc)
988                 RETURN(rc);
989
990         if (it->it_op == IT_UNLINK) {
991                 rc = lmv_enqueue_slaves(exp, lock_type, it, lock_mode,
992                                         data, lockh, lmm, lmmsize,
993                                         cb_compl, cb_blocking, cb_data);
994                 RETURN(rc);
995         }
996
997         if (data->namelen) {
998                 obj = lmv_grab_obj(obd, &data->id1);
999                 if (obj) {
1000                         /* directory is splitted. look for right mds for this
1001                          * name */
1002                         mds = raw_name2idx(obj->hashtype, obj->objcount,
1003                                            (char *)data->name, data->namelen);
1004                         data->id1 = obj->objs[mds].id;
1005                         lmv_put_obj(obj);
1006                 }
1007         }
1008         CDEBUG(D_OTHER, "ENQUEUE '%s' on "DLID4"\n", LL_IT2STR(it),
1009                OLID4(&data->id1));
1010         
1011         rc = md_enqueue(lmv->tgts[id_group(&data->id1)].ltd_exp, 
1012                         lock_type, it, lock_mode, data, lockh, lmm, 
1013                         lmmsize, cb_compl, cb_blocking, cb_data);
1014
1015         RETURN(rc);
1016 }
1017
1018 int lmv_getattr_name(struct obd_export *exp, struct lustre_id *id,
1019                      char *filename, int namelen, unsigned long valid,
1020                      unsigned int ea_size, struct ptlrpc_request **request)
1021 {
1022         int rc, mds = id_group(id), loop = 0;
1023         struct obd_device *obd = exp->exp_obd;
1024         struct lmv_obd *lmv = &obd->u.lmv;
1025         struct lustre_id rid = *id;
1026         struct mds_body *body;
1027         struct lmv_obj *obj;
1028         ENTRY;
1029         rc = lmv_check_connect(obd);
1030         if (rc)
1031                 RETURN(rc);
1032 repeat:
1033         LASSERT(++loop <= 2);
1034         obj = lmv_grab_obj(obd, id);
1035         if (obj) {
1036                 /* directory is splitted. look for right mds for this name */
1037                 mds = raw_name2idx(obj->hashtype, obj->objcount, 
1038                                    filename, namelen - 1);
1039                 rid = obj->objs[mds].id;
1040                 lmv_put_obj(obj);
1041         }
1042         
1043         CDEBUG(D_OTHER, "getattr_name for %*s on "DLID4" -> "DLID4"\n",
1044                namelen, filename, OLID4(id), OLID4(&rid));
1045
1046         rc = md_getattr_name(lmv->tgts[id_group(&rid)].ltd_exp, 
1047                              &rid, filename, namelen, valid, ea_size, request);
1048         if (rc == 0) {
1049                 /* this could be cross-node reference. in this case all we have
1050                  * right now is mds/ino/generation triple. we'd like to find
1051                  * other attributes */
1052                 body = lustre_msg_buf((*request)->rq_repmsg, 0, sizeof(*body));
1053                 LASSERT(body != NULL);
1054                 if (body->valid & OBD_MD_MDS) {
1055                         struct ptlrpc_request *req = NULL;
1056                         
1057                         rid = body->id1;
1058                         CDEBUG(D_OTHER, "request attrs for "DLID4"\n", OLID4(&rid));
1059                         
1060                         rc = md_getattr_name(lmv->tgts[id_group(&rid)].ltd_exp, 
1061                                              &rid, NULL, 1, valid, ea_size, &req);
1062                         ptlrpc_req_finished(*request);
1063                         *request = req;
1064                 }
1065         } else if (rc == -ERESTART) {
1066                 /* directory got splitted. time to update local object and
1067                  * repeat the request with proper MDS */
1068                 rc = lmv_get_mea_and_update_object(exp, &rid);
1069                 if (rc == 0) {
1070                         ptlrpc_req_finished(*request);
1071                         goto repeat;
1072                 }
1073         }
1074         RETURN(rc);
1075 }
1076
1077 /*
1078  * llite passes id of an target inode in data->id1 and id of directory in
1079  * data->id2
1080  */
1081 int lmv_link(struct obd_export *exp, struct mdc_op_data *data,
1082              struct ptlrpc_request **request)
1083 {
1084         struct obd_device *obd = exp->exp_obd;
1085         struct lmv_obd *lmv = &obd->u.lmv;
1086         struct lmv_obj *obj;
1087         int rc;
1088         ENTRY;
1089         
1090         rc = lmv_check_connect(obd);
1091         if (rc)
1092                 RETURN(rc);
1093
1094         if (data->namelen != 0) {
1095                 /* usual link request */
1096                 obj = lmv_grab_obj(obd, &data->id1);
1097                 if (obj) {
1098                         rc = raw_name2idx(obj->hashtype, obj->objcount, 
1099                                           data->name, data->namelen);
1100                         data->id1 = obj->objs[rc].id;
1101                         lmv_put_obj(obj);
1102                 }
1103                 
1104                 CDEBUG(D_OTHER,"link "DLID4":%*s to "DLID4"\n",
1105                        OLID4(&data->id2), data->namelen, data->name,
1106                        OLID4(&data->id1));
1107         } else {
1108                 /* request from MDS to acquire i_links for inode by id1 */
1109                 CDEBUG(D_OTHER, "inc i_nlinks for "DLID4"\n",
1110                        OLID4(&data->id1));
1111         }
1112                         
1113         rc = md_link(lmv->tgts[id_group(&data->id1)].ltd_exp, 
1114                      data, request);
1115         RETURN(rc);
1116 }
1117
1118 int lmv_rename(struct obd_export *exp, struct mdc_op_data *data,
1119                const char *old, int oldlen, const char *new, int newlen,
1120                struct ptlrpc_request **request)
1121 {
1122         struct obd_device *obd = exp->exp_obd;
1123         struct lmv_obd *lmv = &obd->u.lmv;
1124         struct lmv_obj *obj;
1125         int rc, mds;
1126         ENTRY;
1127
1128         CDEBUG(D_OTHER, "rename %*s in "DLID4" to %*s in "DLID4"\n",
1129                oldlen, old, OLID4(&data->id1), newlen, new, OLID4(&data->id2));
1130         
1131         if (!lmv_id_equal(&data->id1, &data->id2)) {
1132                 CDEBUG(D_OTHER,"cross-node rename "DLID4"/%*s to "DLID4"/%*s\n",
1133                        OLID4(&data->id1), oldlen, old, OLID4(&data->id2),
1134                        newlen, new);
1135         }
1136
1137         rc = lmv_check_connect(obd);
1138         if (rc)
1139                 RETURN(rc);
1140
1141         if (oldlen == 0) {
1142                 /* MDS with old dir entry is asking another MDS to create name
1143                  * there */
1144                 CDEBUG(D_OTHER,
1145                        "create %*s(%d/%d) in "DLID4" pointing "
1146                        "to "DLID4"\n", newlen, new, oldlen, newlen,
1147                        OLID4(&data->id2), OLID4(&data->id1));
1148                 mds = id_group(&data->id2);
1149                 goto request;
1150         }
1151
1152         obj = lmv_grab_obj(obd, &data->id1);
1153         if (obj) {
1154                 /* directory is already splitted, so we have to forward request
1155                  * to the right MDS */
1156                 mds = raw_name2idx(obj->hashtype, obj->objcount, 
1157                                    (char *)old, oldlen);
1158                 data->id1 = obj->objs[mds].id;
1159                 CDEBUG(D_OTHER, "forward to MDS #%u ("DLID4")\n", mds,
1160                        OLID4(&obj->objs[mds].id));
1161                 lmv_put_obj(obj);
1162         }
1163
1164         obj = lmv_grab_obj(obd, &data->id2);
1165         if (obj) {
1166                 /* directory is already splitted, so we have to forward request
1167                  * to the right MDS */
1168                 mds = raw_name2idx(obj->hashtype, obj->objcount, 
1169                                    (char *)new, newlen);
1170                 
1171                 data->id2 = obj->objs[mds].id;
1172                 CDEBUG(D_OTHER, "forward to MDS #%u ("DLID4")\n", mds,
1173                        OLID4(&obj->objs[mds].id));
1174                 lmv_put_obj(obj);
1175         }
1176         
1177         mds = id_group(&data->id1);
1178
1179 request:
1180         rc = md_rename(lmv->tgts[mds].ltd_exp, data, old, oldlen,
1181                        new, newlen, request); 
1182         RETURN(rc);
1183 }
1184
1185 int lmv_setattr(struct obd_export *exp, struct mdc_op_data *data,
1186                 struct iattr *iattr, void *ea, int ealen, void *ea2,
1187                 int ea2len, struct ptlrpc_request **request)
1188 {
1189         struct obd_device *obd = exp->exp_obd;
1190         struct lmv_obd *lmv = &obd->u.lmv;
1191         struct ptlrpc_request *req;
1192         struct mds_body *body;
1193         struct lmv_obj *obj;
1194         int rc = 0, i;
1195         ENTRY;
1196
1197         rc = lmv_check_connect(obd);
1198         if (rc)
1199                 RETURN(rc);
1200
1201         obj = lmv_grab_obj(obd, &data->id1);
1202         
1203         CDEBUG(D_OTHER, "SETATTR for "DLID4", valid 0x%x%s\n",
1204                OLID4(&data->id1), iattr->ia_valid, obj ? ", splitted" : "");
1205         
1206         if (obj) {
1207                 for (i = 0; i < obj->objcount; i++) {
1208                         data->id1 = obj->objs[i].id;
1209                         
1210                         rc = md_setattr(lmv->tgts[id_group(&data->id1)].ltd_exp, 
1211                                         data, iattr, ea, ealen, ea2, ea2len, &req);
1212
1213                         if (lmv_id_equal(&obj->id, &obj->objs[i].id)) {
1214                                 /* this is master object and this request should
1215                                  * be returned back to llite */
1216                                 *request = req;
1217                         } else {
1218                                 ptlrpc_req_finished(req);
1219                         }
1220
1221                         if (rc)
1222                                 break;
1223                 }
1224                 lmv_put_obj(obj);
1225         } else {
1226                 LASSERT(id_group(&data->id1) < lmv->desc.ld_tgt_count);
1227                 rc = md_setattr(lmv->tgts[id_group(&data->id1)].ltd_exp,
1228                                 data, iattr, ea, ealen, ea2, ea2len, request); 
1229                 if (rc == 0) {
1230                         body = lustre_msg_buf((*request)->rq_repmsg, 0,
1231                                               sizeof(*body));
1232                         LASSERT(body != NULL);
1233 //                        LASSERT(body->mds == id_group(&data->id1));
1234                 }
1235         }
1236         RETURN(rc);
1237 }
1238
1239 int lmv_sync(struct obd_export *exp, struct lustre_id *id,
1240              struct ptlrpc_request **request)
1241 {
1242         struct obd_device *obd = exp->exp_obd;
1243         struct lmv_obd *lmv = &obd->u.lmv;
1244         int rc;
1245         ENTRY;
1246
1247         rc = lmv_check_connect(obd);
1248         if (rc)
1249                 RETURN(rc);
1250
1251         rc = md_sync(lmv->tgts[id_group(id)].ltd_exp, 
1252                      id, request);
1253         RETURN(rc);
1254 }
1255
1256 int lmv_dirobj_blocking_ast(struct ldlm_lock *lock, 
1257                             struct ldlm_lock_desc *desc,
1258                             void *data, int flag)
1259 {
1260         struct lustre_handle lockh;
1261         struct lmv_obj *obj;
1262         int rc;
1263         ENTRY;
1264
1265         switch (flag) {
1266         case LDLM_CB_BLOCKING:
1267                 ldlm_lock2handle(lock, &lockh);
1268                 rc = ldlm_cli_cancel(&lockh);
1269                 if (rc < 0) {
1270                         CDEBUG(D_INODE, "ldlm_cli_cancel: %d\n", rc);
1271                         RETURN(rc);
1272                 }
1273                 break;
1274         case LDLM_CB_CANCELING:
1275                 /* time to drop cached attrs for dirobj */
1276                 obj = lock->l_ast_data;
1277                 if (obj) {
1278                         CDEBUG(D_OTHER, "cancel %s on "LPU64"/"LPU64
1279                                ", master "DLID4"\n",
1280                                lock->l_resource->lr_name.name[3] == 1 ?
1281                                "LOOKUP" : "UPDATE",
1282                                lock->l_resource->lr_name.name[0],
1283                                lock->l_resource->lr_name.name[1], 
1284                                OLID4(&obj->id));
1285                         lmv_put_obj(obj);
1286                 }
1287                 break;
1288         default:
1289                 LBUG();
1290         }
1291         RETURN(0);
1292 }
1293
1294 void lmv_remove_dots(struct page *page)
1295 {
1296         char *kaddr = page_address(page);
1297         unsigned limit = PAGE_CACHE_SIZE;
1298         unsigned offs, rec_len;
1299         struct ext2_dir_entry_2 *p;
1300
1301         for (offs = 0; offs <= limit - EXT2_DIR_REC_LEN(1); offs += rec_len) {
1302                 p = (struct ext2_dir_entry_2 *)(kaddr + offs);
1303                 rec_len = le16_to_cpu(p->rec_len);
1304
1305                 if ((p->name_len == 1 && p->name[0] == '.') ||
1306                     (p->name_len == 2 && p->name[0] == '.' && p->name[1] == '.'))
1307                         p->inode = 0;
1308         }
1309 }
1310
1311 int lmv_readpage(struct obd_export *exp, struct lustre_id *id,
1312                  __u64 offset, struct page *page,
1313                  struct ptlrpc_request **request)
1314 {
1315         struct obd_device *obd = exp->exp_obd;
1316         struct lmv_obd *lmv = &obd->u.lmv;
1317         struct lustre_id rid = *id;
1318         struct lmv_obj *obj;
1319         int rc, i;
1320         ENTRY;
1321
1322 #warning "we need well-desgined readdir() implementation"
1323         rc = lmv_check_connect(obd);
1324         if (rc)
1325                 RETURN(rc);
1326
1327         LASSERT(id_group(id) < lmv->desc.ld_tgt_count);
1328         CDEBUG(D_OTHER, "READPAGE at %llu from "DLID4"\n",
1329                offset, OLID4(&rid));
1330
1331         obj = lmv_grab_obj(obd, id);
1332         if (obj) {
1333                 lmv_lock_obj(obj);
1334
1335                 /* find dirobj containing page with requested offset. */
1336                 for (i = 0; i < obj->objcount; i++) {
1337                         if (offset < obj->objs[i].size)
1338                                 break;
1339                         offset -= obj->objs[i].size;
1340                 }
1341                 rid = obj->objs[i].id;
1342                 
1343                 lmv_unlock_obj(obj);
1344                 lmv_put_obj(obj);
1345                 
1346                 CDEBUG(D_OTHER, "forward to "DLID4" with offset %lu\n",
1347                        OLID4(&rid), (unsigned long)offset);
1348         }
1349         rc = md_readpage(lmv->tgts[id_group(&rid)].ltd_exp, &rid, 
1350                          offset, page, request);
1351         
1352         if (rc == 0 && !lmv_id_equal(&rid, id))
1353                 /* this page isn't from master object. To avoid "." and ".." 
1354                  * duplication in directory, we have to remove them from all
1355                  * slave objects */
1356                 lmv_remove_dots(page);
1357         
1358         RETURN(rc);
1359 }
1360
1361 int lmv_unlink_slaves(struct obd_export *exp, struct mdc_op_data *data,
1362                       struct ptlrpc_request **req)
1363 {
1364         struct obd_device *obd = exp->exp_obd;
1365         struct lmv_obd *lmv = &obd->u.lmv;
1366         struct mea *mea = data->mea1;
1367         struct mdc_op_data data2;
1368         int i, rc = 0, mds;
1369         ENTRY;
1370
1371         LASSERT(mea != NULL);
1372         for (i = 0; i < mea->mea_count; i++) {
1373                 memset(&data2, 0, sizeof(data2));
1374                 data2.id1 = mea->mea_ids[i];
1375                 data2.create_mode = MDS_MODE_DONT_LOCK | S_IFDIR;
1376                 
1377                 mds = id_group(&data2.id1);
1378
1379                 if (lmv->tgts[mds].ltd_exp == NULL)
1380                         continue;
1381
1382                 rc = md_unlink(lmv->tgts[mds].ltd_exp, &data2, req);
1383                 CDEBUG(D_OTHER, "unlink slave "DLID4" -> %d\n",
1384                        OLID4(&mea->mea_ids[i]), rc);
1385                 if (*req) {
1386                         ptlrpc_req_finished(*req);
1387                         *req = NULL;
1388                 }
1389                 if (rc)
1390                         break;
1391         }
1392         RETURN(rc);
1393 }
1394
1395 int lmv_delete_object(struct obd_export *exp, struct lustre_id *id)
1396 {
1397         ENTRY;
1398
1399         if (!lmv_delete_obj(exp, id)) {
1400                 CDEBUG(D_OTHER, "object "DLID4" is not found.\n",
1401                        OLID4(id));
1402         }
1403         
1404         RETURN(0);
1405 }
1406
1407 int lmv_unlink(struct obd_export *exp, struct mdc_op_data *data,
1408                struct ptlrpc_request **request)
1409 {
1410         struct obd_device *obd = exp->exp_obd;
1411         struct lmv_obd *lmv = &obd->u.lmv;
1412         int rc, i = 0;
1413         ENTRY;
1414         
1415         rc = lmv_check_connect(obd);
1416         if (rc)
1417                 RETURN(rc);
1418
1419         if (data->namelen == 0 && data->mea1 != NULL) {
1420                 /* mds asks to remove slave objects */
1421                 rc = lmv_unlink_slaves(exp, data, request);
1422                 RETURN(rc);
1423         } else if (data->namelen != 0) {
1424                 struct lmv_obj *obj;
1425                 
1426                 obj = lmv_grab_obj(obd, &data->id1);
1427                 if (obj) {
1428                         i = raw_name2idx(obj->hashtype, obj->objcount,
1429                                          data->name, data->namelen);
1430                         data->id1 = obj->objs[i].id;
1431                         lmv_put_obj(obj);
1432                 }
1433                 CDEBUG(D_OTHER, "unlink '%*s' in "DLID4" -> %u\n",
1434                        data->namelen, data->name, OLID4(&data->id1),
1435                        i);
1436         } else {
1437                 CDEBUG(D_OTHER, "drop i_nlink on "DLID4"\n",
1438                        OLID4(&data->id1));
1439         }
1440         rc = md_unlink(lmv->tgts[id_group(&data->id1)].ltd_exp, 
1441                        data, request); 
1442         RETURN(rc);
1443 }
1444
1445 struct obd_device *lmv_get_real_obd(struct obd_export *exp,
1446                                     char *name, int len)
1447 {
1448         struct obd_device *obd = exp->exp_obd;
1449         struct lmv_obd *lmv = &obd->u.lmv;
1450         int rc;
1451         ENTRY;
1452
1453         rc = lmv_check_connect(obd);
1454         if (rc)
1455                 RETURN(ERR_PTR(rc));
1456         obd = lmv->tgts[0].ltd_exp->exp_obd;
1457         EXIT;
1458         return obd;
1459 }
1460
1461 int lmv_init_ea_size(struct obd_export *exp, int easize, int cookiesize)
1462 {
1463         struct obd_device *obd = exp->exp_obd;
1464         struct lmv_obd *lmv = &obd->u.lmv;
1465         int i, rc = 0, change = 0;
1466         ENTRY;
1467
1468         if (lmv->max_easize < easize) {
1469                 lmv->max_easize = easize;
1470                 change = 1;
1471         }
1472         if (lmv->max_cookiesize < cookiesize) {
1473                 lmv->max_cookiesize = cookiesize;
1474                 change = 1;
1475         }
1476         if (change == 0)
1477                 RETURN(0);
1478         
1479         if (lmv->connected == 0)
1480                 RETURN(0);
1481
1482         for (i = 0; i < lmv->desc.ld_tgt_count; i++) {
1483                 if (lmv->tgts[i].ltd_exp == NULL) {
1484                         CWARN("%s: NULL export for %d\n", obd->obd_name, i);
1485                         continue;
1486                 }
1487
1488                 rc = obd_init_ea_size(lmv->tgts[i].ltd_exp, easize, cookiesize);
1489                 if (rc) {
1490                         CERROR("obd_init_ea_size() failed on MDT target %d, "
1491                                "error %d.\n", i, rc);
1492                         break;
1493                 }
1494         }
1495         RETURN(rc);
1496 }
1497
1498 int lmv_obd_create_single(struct obd_export *exp, struct obdo *oa,
1499                           struct lov_stripe_md **ea, struct obd_trans_info *oti)
1500 {
1501         struct obd_device *obd = exp->exp_obd;
1502         struct lmv_obd *lmv = &obd->u.lmv;
1503         struct lov_stripe_md obj_md;
1504         struct lov_stripe_md *obj_mdp = &obj_md;
1505         int rc = 0;
1506         ENTRY;
1507
1508         LASSERT(ea == NULL);
1509         LASSERT(oa->o_mds < lmv->desc.ld_tgt_count);
1510
1511         rc = obd_create(lmv->tgts[oa->o_mds].ltd_exp, oa, &obj_mdp, oti);
1512
1513         RETURN(rc);
1514 }
1515
1516 int lmv_getready(struct obd_export *exp)
1517 {
1518         struct obd_device *obd = exp->exp_obd;
1519         int rc = 0;
1520         
1521         ENTRY;
1522         rc = lmv_check_connect(obd);
1523         RETURN(rc);
1524 }
1525
1526 /*
1527  * to be called from MDS only.
1528  */
1529 int lmv_obd_create(struct obd_export *exp, struct obdo *oa,
1530                    struct lov_stripe_md **ea, struct obd_trans_info *oti)
1531 {
1532         struct obd_device *obd = exp->exp_obd;
1533         struct lmv_obd *lmv = &obd->u.lmv;
1534         struct lustre_id mid;
1535         int i, c, rc = 0;
1536         struct mea *mea;
1537         int lcount;
1538         ENTRY;
1539
1540         rc = lmv_check_connect(obd);
1541         if (rc)
1542                 RETURN(rc);
1543
1544         LASSERT(oa != NULL);
1545         
1546         if (ea == NULL) {
1547                 rc = lmv_obd_create_single(exp, oa, NULL, oti);
1548                 if (rc)
1549                         CERROR("Can't create object, rc = %d\n", rc);
1550                 RETURN(rc);
1551         }
1552
1553         if (*ea == NULL) {
1554                 rc = obd_alloc_diskmd(exp, (struct lov_mds_md **)ea);
1555                 if (rc < 0) {
1556                         CERROR("obd_alloc_diskmd() failed, error %d\n",
1557                                rc);
1558                         RETURN(rc);
1559                 }
1560                 
1561                 if (*ea == NULL)
1562                         RETURN(-ENOMEM);
1563         }
1564
1565         rc = 0;
1566         
1567         id_ino(&mid) = oa->o_id;
1568         id_fid(&mid) = oa->o_fid;
1569         id_gen(&mid) = oa->o_generation;
1570
1571         mea = (struct mea *)*ea;
1572         if (!mea->mea_count || mea->mea_count > lmv->desc.ld_tgt_count)
1573                 mea->mea_count = lmv->desc.ld_tgt_count;
1574         
1575         mea->mea_master = -1;
1576         mea->mea_magic = MEA_MAGIC_ALL_CHARS;
1577
1578         lcount = lmv->desc.ld_tgt_count;
1579         for (i = 0, c = 0; c < mea->mea_count && i < lcount; i++) {
1580                 struct lov_stripe_md obj_md;
1581                 struct lov_stripe_md *obj_mdp = &obj_md;
1582                
1583                 if (lmv->tgts[i].ltd_exp == NULL) {
1584                         /* this is master MDS */
1585                         mea->mea_master = i;
1586                         id_group(&mea->mea_ids[c]) = i;
1587                         id_ino(&mea->mea_ids[c]) = id_ino(&mid);
1588                         id_gen(&mea->mea_ids[c]) = id_gen(&mid);
1589                         id_fid(&mea->mea_ids[c]) = id_fid(&mid);
1590                         c++;
1591                         continue;
1592                 }
1593
1594                 /* "master" MDS should always be part of stripped dir, so scan
1595                  * for it. */
1596                 if (mea->mea_master == -1 && c == mea->mea_count - 1)
1597                         continue;
1598
1599                 oa->o_valid = OBD_MD_FLGENER | OBD_MD_FLTYPE | OBD_MD_FLMODE |
1600                         OBD_MD_FLUID | OBD_MD_FLGID | OBD_MD_FLID;
1601
1602                 rc = obd_create(lmv->tgts[c].ltd_exp, oa, &obj_mdp, oti);
1603                 if (rc) {
1604                         CERROR("obd_create() failed on MDT target %d, "
1605                                "error %d\n", c, rc);
1606                         RETURN(rc);
1607                 }
1608
1609                 CDEBUG(D_OTHER, "dirobj at mds %d: "LPU64"/%u\n",
1610                        i, oa->o_id, oa->o_generation);
1611
1612                 /* here after object is created on desired MDS we save its fid
1613                  * to local mea_ids. */
1614                 LASSERT(oa->o_fid);
1615                 
1616                 id_group(&mea->mea_ids[c]) = i;
1617                 id_ino(&mea->mea_ids[c]) = oa->o_id;
1618                 id_fid(&mea->mea_ids[c]) = oa->o_fid;
1619                 id_gen(&mea->mea_ids[c]) = oa->o_generation;
1620                 c++;
1621         }
1622         LASSERT(c == mea->mea_count);
1623         CDEBUG(D_OTHER, "%d dirobjects created\n", (int) mea->mea_count);
1624
1625         RETURN(rc);
1626 }
1627
1628 static int lmv_llog_init(struct obd_device *obd, struct obd_llogs *llogs, 
1629                          struct obd_device *tgt, int count,
1630                          struct llog_catid *logid)
1631 {
1632         struct llog_ctxt *ctxt;
1633         int rc;
1634         ENTRY;
1635
1636         rc = obd_llog_setup(obd, llogs, LLOG_CONFIG_REPL_CTXT, tgt, 0, NULL,
1637                             &llog_client_ops);
1638         if (rc == 0) {
1639                 ctxt = llog_get_context(llogs, LLOG_CONFIG_REPL_CTXT);
1640                 ctxt->loc_imp = tgt->u.cli.cl_import;
1641         }
1642
1643         RETURN(rc);
1644 }
1645
1646 static int lmv_llog_finish(struct obd_device *obd,
1647                            struct obd_llogs *llogs, int count)
1648 {
1649         int rc;
1650         ENTRY;
1651
1652         rc = obd_llog_cleanup(llog_get_context(llogs, LLOG_CONFIG_REPL_CTXT));
1653         RETURN(rc);
1654 }
1655
1656 static int lmv_get_info(struct obd_export *exp, __u32 keylen,
1657                         void *key, __u32 *vallen, void *val)
1658 {
1659         struct obd_device *obd;
1660         struct lmv_obd *lmv;
1661         int rc = 0;
1662         ENTRY;
1663
1664         obd = class_exp2obd(exp);
1665         if (obd == NULL) {
1666                 CDEBUG(D_IOCTL, "invalid client cookie "LPX64"\n",
1667                        exp->exp_handle.h_cookie);
1668                 RETURN(-EINVAL);
1669         }
1670
1671         lmv = &obd->u.lmv;
1672         if (keylen == 6 && memcmp(key, "mdsize", 6) == 0) {
1673                 __u32 *mdsize = val;
1674                 *vallen = sizeof(__u32);
1675                 *mdsize = sizeof(struct lustre_id) * lmv->desc.ld_tgt_count
1676                         + sizeof(struct mea);
1677                 RETURN(0);
1678         } else if (keylen == 6 && memcmp(key, "mdsnum", 6) == 0) {
1679                 struct obd_uuid *cluuid = &lmv->cluuid;
1680                 struct lmv_tgt_desc *tgts;
1681                 __u32 *mdsnum = val;
1682                 int i;
1683
1684                 for (i = 0, tgts = lmv->tgts; i < lmv->desc.ld_tgt_count; i++, tgts++) {
1685                         if (obd_uuid_equals(&tgts->uuid, cluuid)) {
1686                                 *vallen = sizeof(__u32);
1687                                 *mdsnum = i;
1688                                 RETURN(0);
1689                         }
1690                 }
1691                 LASSERT(0);
1692         } else if (keylen == 6 && memcmp(key, "rootid", 6) == 0) {
1693                 /* getting rootid from first MDS. */
1694                 rc = obd_get_info(lmv->tgts[0].ltd_exp, keylen, key,
1695                                   vallen, val);
1696                 RETURN(rc);
1697         } else if (keylen >= strlen("lmvdesc") && strcmp(key, "lmvdesc") == 0) {
1698                 struct lmv_desc *desc_ret = val;
1699                 *desc_ret = lmv->desc;
1700                 RETURN(0);
1701         }
1702
1703         CDEBUG(D_IOCTL, "invalid key\n");
1704         RETURN(-EINVAL);
1705 }
1706
1707 int lmv_set_info(struct obd_export *exp, obd_count keylen,
1708                  void *key, obd_count vallen, void *val)
1709 {
1710         struct obd_device *obd;
1711         struct lmv_obd *lmv;
1712         ENTRY;
1713
1714         obd = class_exp2obd(exp);
1715         if (obd == NULL) {
1716                 CDEBUG(D_IOCTL, "invalid client cookie "LPX64"\n",
1717                        exp->exp_handle.h_cookie);
1718                 RETURN(-EINVAL);
1719         }
1720         lmv = &obd->u.lmv;
1721
1722         if (keylen >= strlen("inter_mds") && strcmp(key, "inter_mds") == 0) {
1723                 lmv->server_timeout = 1;
1724                 lmv_set_timeouts(obd);
1725                 RETURN(0);
1726         }
1727         
1728         RETURN(-EINVAL);
1729 }
1730
1731 int lmv_packmd(struct obd_export *exp, struct lov_mds_md **lmmp,
1732                struct lov_stripe_md *lsm)
1733 {
1734         struct obd_device *obd = class_exp2obd(exp);
1735         struct lmv_obd *lmv = &obd->u.lmv;
1736         int mea_size;
1737         ENTRY;
1738
1739         mea_size = sizeof(struct lustre_id) * 
1740                 lmv->desc.ld_tgt_count + sizeof(struct mea);
1741         if (!lmmp)
1742                 RETURN(mea_size);
1743
1744         if (*lmmp && !lsm) {
1745                 OBD_FREE(*lmmp, mea_size);
1746                 *lmmp = NULL;
1747                 RETURN(0);
1748         }
1749
1750         if (*lmmp == NULL) {
1751                 OBD_ALLOC(*lmmp, mea_size);
1752                 if (*lmmp == NULL)
1753                         RETURN(-ENOMEM);
1754         }
1755
1756         if (!lsm)
1757                 RETURN(mea_size);
1758
1759 #warning "MEA packing/convertation must be here! -bzzz"
1760         memcpy(*lmmp, lsm, mea_size);
1761         RETURN(mea_size);
1762 }
1763
1764 int lmv_unpackmd(struct obd_export *exp, struct lov_stripe_md **mem_tgt,
1765                  struct lov_mds_md *disk_src, int mdsize)
1766 {
1767         struct obd_device *obd = class_exp2obd(exp);
1768         struct lmv_obd *lmv = &obd->u.lmv;
1769         struct mea **tmea = (struct mea **) mem_tgt;
1770         struct mea *mea = (void *) disk_src;
1771         int mea_size;
1772         ENTRY;
1773
1774         mea_size = sizeof(struct lustre_id) * 
1775                 lmv->desc.ld_tgt_count + sizeof(struct mea);
1776         if (mem_tgt == NULL)
1777                 return mea_size;
1778
1779         if (*mem_tgt != NULL && disk_src == NULL) {
1780                 OBD_FREE(*tmea, mea_size);
1781                 RETURN(0);
1782         }
1783
1784         LASSERT(mea_size == mdsize);
1785
1786         OBD_ALLOC(*tmea, mea_size);
1787         if (*tmea == NULL)
1788                 RETURN(-ENOMEM);
1789
1790         if (!disk_src)
1791                 RETURN(mea_size);
1792
1793 #warning "MEA unpacking/convertation must be here! -bzzz"
1794         memcpy(*tmea, mea, mdsize);
1795         RETURN(mea_size);
1796 }
1797
1798 int lmv_brw(int rw, struct obd_export *exp, struct obdo *oa,
1799             struct lov_stripe_md *ea, obd_count oa_bufs,
1800             struct brw_page *pgarr, struct obd_trans_info *oti)
1801 {
1802         struct obd_device *obd = exp->exp_obd;
1803         struct lmv_obd *lmv = &obd->u.lmv;
1804         struct mea *mea = (struct mea *) ea;
1805         int err;
1806       
1807         LASSERT(oa != NULL);
1808         LASSERT(ea != NULL);
1809         LASSERT(pgarr != NULL);
1810         LASSERT(oa->o_mds < lmv->desc.ld_tgt_count);
1811
1812         oa->o_gr = id_gen(&mea->mea_ids[oa->o_mds]);
1813         oa->o_id = id_ino(&mea->mea_ids[oa->o_mds]);
1814         oa->o_valid = OBD_MD_FLID | OBD_MD_FLGROUP;
1815         
1816         err = obd_brw(rw, lmv->tgts[oa->o_mds].ltd_exp,
1817                       oa, NULL, oa_bufs, pgarr, oti);
1818         RETURN(err);
1819 }
1820
1821 struct obd_ops lmv_obd_ops = {
1822         .o_owner                = THIS_MODULE,
1823         .o_attach               = lmv_attach,
1824         .o_detach               = lmv_detach,
1825         .o_setup                = lmv_setup,
1826         .o_cleanup              = lmv_cleanup,
1827         .o_connect              = lmv_connect,
1828         .o_disconnect           = lmv_disconnect,
1829         .o_statfs               = lmv_statfs,
1830         .o_llog_init            = lmv_llog_init,
1831         .o_llog_finish          = lmv_llog_finish,
1832         .o_get_info             = lmv_get_info,
1833         .o_set_info             = lmv_set_info,
1834         .o_create               = lmv_obd_create,
1835         .o_packmd               = lmv_packmd,
1836         .o_unpackmd             = lmv_unpackmd,
1837         .o_brw                  = lmv_brw,
1838         .o_init_ea_size         = lmv_init_ea_size,
1839         .o_notify               = lmv_notify,
1840         .o_iocontrol            = lmv_iocontrol,
1841         .o_getready             = lmv_getready,
1842 };
1843
1844 struct md_ops lmv_md_ops = {
1845         .m_getstatus            = lmv_getstatus,
1846         .m_getattr              = lmv_getattr,
1847         .m_change_cbdata        = lmv_change_cbdata,
1848         .m_change_cbdata_name   = lmv_change_cbdata_name,
1849         .m_close                = lmv_close,
1850         .m_create               = lmv_create,
1851         .m_done_writing         = lmv_done_writing,
1852         .m_enqueue              = lmv_enqueue,
1853         .m_getattr_name         = lmv_getattr_name,
1854         .m_intent_lock          = lmv_intent_lock,
1855         .m_link                 = lmv_link,
1856         .m_rename               = lmv_rename,
1857         .m_setattr              = lmv_setattr,
1858         .m_sync                 = lmv_sync,
1859         .m_readpage             = lmv_readpage,
1860         .m_unlink               = lmv_unlink,
1861         .m_get_real_obd         = lmv_get_real_obd,
1862         .m_valid_attrs          = lmv_valid_attrs,
1863         .m_delete_object        = lmv_delete_object,
1864 };
1865
1866 int __init lmv_init(void)
1867 {
1868         struct lprocfs_static_vars lvars;
1869         int rc;
1870
1871         lprocfs_init_vars(lmv, &lvars);
1872         rc = class_register_type(&lmv_obd_ops, &lmv_md_ops,
1873                                  lvars.module_vars, OBD_LMV_DEVICENAME);
1874         RETURN(rc);
1875 }
1876
1877 #ifdef __KERNEL__
1878 static void lmv_exit(void)
1879 {
1880         class_unregister_type(OBD_LMV_DEVICENAME);
1881 }
1882
1883 MODULE_AUTHOR("Cluster File Systems, Inc. <info@clusterfs.com>");
1884 MODULE_DESCRIPTION("Lustre Logical Metadata Volume OBD driver");
1885 MODULE_LICENSE("GPL");
1886
1887 module_init(lmv_init);
1888 module_exit(lmv_exit);
1889 #endif