Whamcloud - gitweb
Added the following:
[fs/lustre-release.git] / lustre / lmv / lmv_obd.c
1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2  * vim:expandtab:shiftwidth=8:tabstop=8:
3  *
4  * Copyright (C) 2002, 2003 Cluster File Systems, Inc.
5  *
6  *   This file is part of Lustre, http://www.lustre.org.
7  *
8  *   Lustre is free software; you can redistribute it and/or
9  *   modify it under the terms of version 2 of the GNU General Public
10  *   License as published by the Free Software Foundation.
11  *
12  *   Lustre is distributed in the hope that it will be useful,
13  *   but WITHOUT ANY WARRANTY; without even the implied warranty of
14  *   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
15  *   GNU General Public License for more details.
16  *
17  *   You should have received a copy of the GNU General Public License
18  *   along with Lustre; if not, write to the Free Software
19  *   Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA.
20  */
21
22 #ifndef EXPORT_SYMTAB
23 # define EXPORT_SYMTAB
24 #endif
25 #define DEBUG_SUBSYSTEM S_LMV
26 #ifdef __KERNEL__
27 #include <linux/slab.h>
28 #include <linux/module.h>
29 #include <linux/init.h>
30 #include <linux/slab.h>
31 #include <linux/pagemap.h>
32 #include <asm/div64.h>
33 #include <linux/seq_file.h>
34 #else
35 #include <liblustre.h>
36 #endif
37 #include <linux/ext2_fs.h>
38
39 #include <linux/obd_support.h>
40 #include <linux/lustre_lib.h>
41 #include <linux/lustre_net.h>
42 #include <linux/lustre_idl.h>
43 #include <linux/lustre_dlm.h>
44 #include <linux/lustre_mds.h>
45 #include <linux/obd_class.h>
46 #include <linux/obd_ost.h>
47 #include <linux/lprocfs_status.h>
48 #include <linux/lustre_fsfilt.h>
49 #include <linux/obd_lmv.h>
50 #include "lmv_internal.h"
51
52 /* Error codes:
53  *
54  *  -EINVAL  : UUID can't be found in the LMV's target list
55  *  -ENOTCONN: The UUID is found, but the target connection is bad (!)
56  *  -EBADF   : The UUID is found, but the OBD of the wrong type (!)
57  */
58 static int lmv_set_mdc_active(struct lmv_obd *lmv, struct obd_uuid *uuid,
59                               int activate)
60 {
61         struct obd_device *obd;
62         struct lmv_tgt_desc *tgt;
63         int i, rc = 0;
64         ENTRY;
65
66         CDEBUG(D_INFO, "Searching in lmv %p for uuid %s (activate=%d)\n",
67                lmv, uuid->uuid, activate);
68
69         spin_lock(&lmv->lmv_lock);
70         for (i = 0, tgt = lmv->tgts; i < lmv->desc.ld_tgt_count; i++, tgt++) {
71                 CDEBUG(D_INFO, "lmv idx %d is %s conn "LPX64"\n",
72                        i, tgt->uuid.uuid, tgt->ltd_exp->exp_handle.h_cookie);
73                 if (strncmp(uuid->uuid, tgt->uuid.uuid, sizeof uuid->uuid) == 0)
74                         break;
75         }
76
77         if (i == lmv->desc.ld_tgt_count)
78                 GOTO(out, rc = -EINVAL);
79
80         obd = class_exp2obd(tgt->ltd_exp);
81         if (obd == NULL)
82                 GOTO(out, rc = -ENOTCONN);
83
84         CDEBUG(D_INFO, "Found OBD %s=%s device %d (%p) type %s at LMV idx %d\n",
85                obd->obd_name, obd->obd_uuid.uuid, obd->obd_minor, obd,
86                obd->obd_type->typ_name, i);
87         LASSERT(strcmp(obd->obd_type->typ_name, LUSTRE_MDC_NAME) == 0);
88
89         if (tgt->active == activate) {
90                 CDEBUG(D_INFO, "OBD %p already %sactive!\n", obd,
91                        activate ? "" : "in");
92                 GOTO(out, rc);
93         }
94
95         CDEBUG(D_INFO, "Marking OBD %p %sactive\n", obd, activate ? "" : "in");
96
97         tgt->active = activate;
98         if (activate)
99                 lmv->desc.ld_active_tgt_count++;
100         else
101                 lmv->desc.ld_active_tgt_count--;
102
103         EXIT;
104  out:
105         spin_unlock(&lmv->lmv_lock);
106         return rc;
107 }
108
109 static int lmv_notify(struct obd_device *obd, struct obd_device *watched,
110                       int active)
111 {
112         int rc;
113         struct obd_uuid *uuid;
114
115         if (strcmp(watched->obd_type->typ_name, LUSTRE_MDC_NAME)) {
116                 CERROR("unexpected notification of %s %s!\n",
117                        watched->obd_type->typ_name,
118                        watched->obd_name);
119                 return -EINVAL;
120         }
121         uuid = &watched->u.cli.cl_import->imp_target_uuid;
122
123         /* Set MDC as active before notifying the observer, so the
124          * observer can use the MDC normally.  
125          */
126         rc = lmv_set_mdc_active(&obd->u.lmv, uuid, active);
127         if (rc) {
128                 CERROR("%sactivation of %s failed: %d\n",
129                        active ? "" : "de", uuid->uuid, rc);
130                 RETURN(rc);
131         }
132
133         if (obd->obd_observer)
134                 /* Pass the notification up the chain. */
135                 rc = obd_notify(obd->obd_observer, watched, active);
136
137         RETURN(rc);
138 }
139
140 int lmv_attach(struct obd_device *dev, obd_count len, void *data)
141 {
142         struct lprocfs_static_vars lvars;
143         int rc;
144         ENTRY;
145
146         lprocfs_init_vars(lmv, &lvars);
147         rc = lprocfs_obd_attach(dev, lvars.obd_vars);
148         if (rc == 0) {
149 #ifdef __KERNEL__
150                 struct proc_dir_entry *entry;
151                 
152                 entry = create_proc_entry("target_obd", 0444, dev->obd_proc_entry);
153                 if (entry == NULL)
154                         RETURN(-ENOMEM);
155                 /* entry->proc_fops = &lmv_proc_target_fops; */
156                 entry->data = dev;
157 #endif
158        }
159         RETURN (rc);
160 }
161
162 int lmv_detach(struct obd_device *dev)
163 {
164         return lprocfs_obd_detach(dev);
165 }
166
167 /* This is fake connect function. Its purpose is to initialize lmv and 
168  * say caller that everything is okay. Real connection will be performed
169  * later. */
170 static int lmv_connect(struct lustre_handle *conn, struct obd_device *obd,
171                        struct obd_uuid *cluuid)
172 {
173         struct lmv_obd *lmv = &obd->u.lmv;
174         struct obd_export *exp;
175         int rc;
176         ENTRY;
177
178         rc = class_connect(conn, obd, cluuid);
179         if (rc) {
180                 CERROR("class_connection() returned %d\n", rc);
181                 RETURN(rc);
182         }
183
184         exp = class_conn2export(conn);
185         /* We don't want to actually do the underlying connections more than
186          * once, so keep track. */
187         lmv->refcount++;
188         if (lmv->refcount > 1) {
189                 class_export_put(exp);
190                 RETURN(0);
191         }
192
193         lmv->cluuid = *cluuid;
194         lmv->connected = 0;
195         lmv->exp = exp;
196
197         RETURN(0);
198 }
199
200 void lmv_set_timeouts(struct obd_device *obd)
201 {
202         struct lmv_tgt_desc *tgts;
203         struct lmv_obd *lmv;
204         int i;
205
206         lmv = &obd->u.lmv;
207         if (lmv->server_timeout == 0)
208                 return;
209
210         if (lmv->connected == 0)
211                 return;
212
213         for (i = 0, tgts = lmv->tgts; i < lmv->desc.ld_tgt_count; i++, tgts++) {
214                 if (tgts->ltd_exp == NULL)
215                         continue;
216                 obd_set_info(tgts->ltd_exp, strlen("inter_mds"),
217                              "inter_mds", 0, NULL);
218         }
219 }
220
221 /* Performs a check if passed obd is connected. If no - connect it. */
222 int lmv_check_connect(struct obd_device *obd) {
223         struct lmv_obd *lmv = &obd->u.lmv;
224         struct obd_uuid *cluuid;
225         struct lmv_tgt_desc *tgts;
226         struct obd_export *exp;
227         int rc, rc2, i;
228
229         if (lmv->connected)
230                 return 0;
231       
232         lmv->connected = 1;
233         cluuid = &lmv->cluuid;
234         exp = lmv->exp;
235         
236         CDEBUG(D_OTHER, "time to connect %s to %s\n",
237                cluuid->uuid, obd->obd_name);
238
239         for (i = 0, tgts = lmv->tgts; i < lmv->desc.ld_tgt_count; i++, tgts++) {
240                 struct obd_device *tgt_obd;
241                 struct obd_uuid lmv_osc_uuid = { "LMV_OSC_UUID" };
242                 struct lustre_handle conn = {0, };
243
244                 LASSERT(tgts != NULL);
245
246                 tgt_obd = class_find_client_obd(&tgts->uuid, LUSTRE_MDC_NAME, 
247                                                 &obd->obd_uuid);
248                 if (!tgt_obd) {
249                         CERROR("Target %s not attached\n", tgts->uuid.uuid);
250                         GOTO(out_disc, rc = -EINVAL);
251                 }
252
253                 /* for MDS: don't connect to yourself */
254                 if (obd_uuid_equals(&tgts->uuid, cluuid)) {
255                         CDEBUG(D_OTHER, "don't connect back to %s\n",
256                                cluuid->uuid);
257                         tgts->ltd_exp = NULL;
258                         continue;
259                 }
260
261                 CDEBUG(D_OTHER, "connect to %s(%s) - %s, %s FOR %s\n",
262                         tgt_obd->obd_name, tgt_obd->obd_uuid.uuid,
263                         tgts->uuid.uuid, obd->obd_uuid.uuid,
264                         cluuid->uuid);
265
266                 if (!tgt_obd->obd_set_up) {
267                         CERROR("Target %s not set up\n", tgts->uuid.uuid);
268                         GOTO(out_disc, rc = -EINVAL);
269                 }
270                 
271                 rc = obd_connect(&conn, tgt_obd, &lmv_osc_uuid);
272                 if (rc) {
273                         CERROR("Target %s connect error %d\n",
274                                 tgts->uuid.uuid, rc);
275                         GOTO(out_disc, rc);
276                 }
277                 tgts->ltd_exp = class_conn2export(&conn);
278
279                 obd_init_ea_size(tgts->ltd_exp, lmv->max_easize,
280                                  lmv->max_cookiesize);
281                 
282                 rc = obd_register_observer(tgt_obd, obd);
283                 if (rc) {
284                         CERROR("Target %s register_observer error %d\n",
285                                tgts->uuid.uuid, rc);
286                         obd_disconnect(tgts->ltd_exp, 0);
287                         GOTO(out_disc, rc);
288                 }
289
290                 lmv->desc.ld_active_tgt_count++;
291                 tgts->active = 1;
292                 
293                 CDEBUG(D_OTHER, "connected to %s(%s) successfully (%d)\n",
294                         tgt_obd->obd_name, tgt_obd->obd_uuid.uuid,
295                         atomic_read(&obd->obd_refcount));
296         }
297
298         lmv_set_timeouts(obd);
299
300         class_export_put(exp);
301         return 0;
302
303  out_disc:
304         while (i-- > 0) {
305                 struct obd_uuid uuid;
306                 --tgts;
307                 --lmv->desc.ld_active_tgt_count;
308                 tgts->active = 0;
309                 /* save for CERROR below; (we know it's terminated) */
310                 uuid = tgts->uuid;
311                 rc2 = obd_disconnect(tgts->ltd_exp, 0);
312                 if (rc2)
313                         CERROR("error: LMV target %s disconnect on MDT idx %d: "
314                                "rc = %d\n", uuid.uuid, i, rc2);
315         }
316         class_disconnect(exp, 0);
317         RETURN (rc);
318 }
319
320 static int lmv_disconnect(struct obd_export *exp, int flags)
321 {
322         struct obd_device *obd = class_exp2obd(exp);
323         struct lmv_obd *lmv = &obd->u.lmv;
324         int rc, i;
325         ENTRY;
326
327         if (!lmv->tgts)
328                 goto out_local;
329
330         /* Only disconnect the underlying layers on the final disconnect. */
331         lmv->refcount--;
332         if (lmv->refcount != 0)
333                 goto out_local;
334
335         for (i = 0; i < lmv->desc.ld_tgt_count; i++) {
336                 if (lmv->tgts[i].ltd_exp == NULL)
337                         continue;
338
339                 if (obd->obd_no_recov) {
340                         /* Pass it on to our clients.
341                          * XXX This should be an argument to disconnect,
342                          * XXX not a back-door flag on the OBD.  Ah well.
343                          */
344                         struct obd_device *mdc_obd;
345                         mdc_obd = class_exp2obd(lmv->tgts[i].ltd_exp);
346                         if (mdc_obd)
347                                 mdc_obd->obd_no_recov = 1;
348                 }
349
350                 CDEBUG(D_OTHER, "disconnected from %s(%s) successfully\n",
351                         lmv->tgts[i].ltd_exp->exp_obd->obd_name,
352                         lmv->tgts[i].ltd_exp->exp_obd->obd_uuid.uuid);
353
354                 obd_register_observer(lmv->tgts[i].ltd_exp->exp_obd, NULL);
355
356                 rc = obd_disconnect(lmv->tgts[i].ltd_exp, flags);
357                 if (lmv->tgts[i].active) {
358                         lmv->desc.ld_active_tgt_count--;
359                         lmv->tgts[i].active = 0;
360                 }
361                 lmv->tgts[i].ltd_exp = NULL;
362         }
363
364 out_local:
365         /* This is the case when no real connection is established by
366          * lmv_check_connect(). */
367         if (!lmv->connected)
368                 class_export_put(exp);
369         rc = class_disconnect(exp, 0);
370         RETURN(rc);
371 }
372
373 static int lmv_iocontrol(unsigned int cmd, struct obd_export *exp,
374                          int len, void *karg, void *uarg)
375 {
376         struct obd_device *obddev = class_exp2obd(exp);
377         struct lmv_obd *lmv = &obddev->u.lmv;
378         int i, rc = 0, set = 0;
379
380         ENTRY;
381
382         if (lmv->desc.ld_tgt_count == 0)
383                 RETURN(-ENOTTY);
384         
385         for (i = 0; i < lmv->desc.ld_tgt_count; i++) {
386                 int err;
387
388                 err = obd_iocontrol(cmd, lmv->tgts[i].ltd_exp,
389                                     len, karg, uarg);
390                 if (err) {
391                         if (lmv->tgts[i].active) {
392                                 CERROR("error: iocontrol MDC %s on MDT"
393                                        "idx %d: err = %d\n",
394                                        lmv->tgts[i].uuid.uuid, i, err);
395                                 if (!rc)
396                                         rc = err;
397                         }
398                 } else
399                         set = 1;
400         }
401         if (!set && !rc)
402                 rc = -EIO;
403
404         RETURN(rc);
405 }
406
407 static int lmv_setup(struct obd_device *obd, obd_count len, void *buf)
408 {
409         struct lustre_cfg *lcfg = buf;
410         struct lmv_desc *desc;
411         struct lmv_obd *lmv = &obd->u.lmv;
412         struct obd_uuid *uuids;
413         struct lmv_tgt_desc *tgts;
414         int i;
415         int rc = 0;
416         ENTRY;
417
418         if (lcfg->lcfg_inllen1 < 1) {
419                 CERROR("LMV setup requires a descriptor\n");
420                 RETURN(-EINVAL);
421         }
422
423         if (lcfg->lcfg_inllen2 < 1) {
424                 CERROR("LMV setup requires an OST UUID list\n");
425                 RETURN(-EINVAL);
426         }
427
428         desc = (struct lmv_desc *)lcfg->lcfg_inlbuf1;
429         if (sizeof(*desc) > lcfg->lcfg_inllen1) {
430                 CERROR("descriptor size wrong: %d > %d\n",
431                        (int)sizeof(*desc), lcfg->lcfg_inllen1);
432                 RETURN(-EINVAL);
433         }
434
435         uuids = (struct obd_uuid *)lcfg->lcfg_inlbuf2;
436         if (sizeof(*uuids) * desc->ld_tgt_count != lcfg->lcfg_inllen2) {
437                 CERROR("UUID array size wrong: %u * %u != %u\n",
438                        sizeof(*uuids), desc->ld_tgt_count, lcfg->lcfg_inllen2);
439                 RETURN(-EINVAL);
440         }
441
442         lmv->bufsize = sizeof(struct lmv_tgt_desc) * desc->ld_tgt_count;
443         OBD_ALLOC(lmv->tgts, lmv->bufsize);
444         if (lmv->tgts == NULL) {
445                 CERROR("Out of memory\n");
446                 RETURN(-EINVAL);
447         }
448
449         lmv->desc = *desc;
450         spin_lock_init(&lmv->lmv_lock);
451         
452         for (i = 0, tgts = lmv->tgts; i < desc->ld_tgt_count; i++, tgts++)
453                 tgts->uuid = uuids[i];
454         
455         lmv->max_easize = sizeof(struct ll_fid) *
456                 desc->ld_tgt_count + sizeof(struct mea);
457         
458         lmv->max_cookiesize = 0;
459
460         RETURN(rc);
461 }
462
463 static int lmv_statfs(struct obd_device *obd, struct obd_statfs *osfs,
464                       unsigned long max_age)
465 {
466         struct lmv_obd *lmv = &obd->u.lmv;
467         struct obd_statfs temp;
468         int rc = 0, i;
469         ENTRY;
470         
471         rc = lmv_check_connect(obd);
472         if (rc)
473                 RETURN(rc);
474                 
475         for (i = 0; i < lmv->desc.ld_tgt_count; i++) {
476                 rc = obd_statfs(lmv->tgts[i].ltd_exp->exp_obd, &temp, max_age);
477                 if (rc) {
478                         CERROR("can't stat MDS #%d (%s)\n", i,
479                                lmv->tgts[i].ltd_exp->exp_obd->obd_name);
480                         RETURN(rc);
481                 }
482                 if (i == 0) {
483                         memcpy(osfs, &temp, sizeof(temp));
484                 } else {
485                         osfs->os_bavail += temp.os_bavail;
486                         osfs->os_blocks += temp.os_blocks;
487                         osfs->os_ffree += temp.os_ffree;
488                         osfs->os_files += temp.os_files;
489                 }
490         }
491         RETURN(rc);
492 }
493
494 static int lmv_cleanup(struct obd_device *obd, int flags) 
495 {
496         struct lmv_obd *lmv = &obd->u.lmv;
497         ENTRY;
498         lmv_cleanup_objs(obd);
499         OBD_FREE(lmv->tgts, lmv->bufsize);
500         RETURN(0);
501 }
502
503 static int lmv_getstatus(struct obd_export *exp, struct ll_fid *fid)
504 {
505         struct obd_device *obd = exp->exp_obd;
506         struct lmv_obd *lmv = &obd->u.lmv;
507         int rc;
508         ENTRY;
509         rc = lmv_check_connect(obd);
510         if (rc)
511                 RETURN(rc);
512         rc = md_getstatus(lmv->tgts[0].ltd_exp, fid);
513         fid->mds = 0;
514         RETURN(rc);
515 }
516
517 static int lmv_getattr(struct obd_export *exp, struct ll_fid *fid,
518                 unsigned long valid, unsigned int ea_size,
519                 struct ptlrpc_request **request)
520 {
521         struct obd_device *obd = exp->exp_obd;
522         struct lmv_obd *lmv = &obd->u.lmv;
523         int rc, i = fid->mds;
524         struct lmv_obj *obj;
525         ENTRY;
526         rc = lmv_check_connect(obd);
527         if (rc)
528                 RETURN(rc);
529         obj = lmv_grab_obj(obd, fid, 0);
530         CDEBUG(D_OTHER, "GETATTR for %lu/%lu/%lu %s\n",
531                (unsigned long) fid->mds,
532                (unsigned long) fid->id,
533                (unsigned long) fid->generation,
534                obj ? "(splitted)" : "");
535
536         LASSERT(fid->mds < lmv->desc.ld_tgt_count);
537         rc = md_getattr(lmv->tgts[i].ltd_exp, fid,
538                              valid, ea_size, request);
539         if (rc == 0 && obj) {
540                 /* we have to loop over dirobjs here and gather attrs
541                  * for all the slaves */
542 #warning "attrs gathering here"
543         }
544         lmv_put_obj(obj);
545         RETURN(rc);
546 }
547
548 static int lmv_change_cbdata(struct obd_export *exp,
549                                  struct ll_fid *fid, 
550                                  ldlm_iterator_t it, void *data)
551 {
552         struct obd_device *obd = exp->exp_obd;
553         struct lmv_obd *lmv = &obd->u.lmv;
554         int rc = 0;
555         ENTRY;
556         
557         rc = lmv_check_connect(obd);
558         if (rc)
559                 RETURN(rc);
560         CDEBUG(D_OTHER, "CBDATA for %lu/%lu/%lu\n",
561                (unsigned long) fid->mds,
562                (unsigned long) fid->id,
563                (unsigned long) fid->generation);
564         LASSERT(fid->mds < lmv->desc.ld_tgt_count);
565         rc = md_change_cbdata(lmv->tgts[fid->mds].ltd_exp, fid, it, data);
566         RETURN(rc);
567 }
568
569 static int lmv_change_cbdata_name(struct obd_export *exp, struct ll_fid *pfid,
570                                   char *name, int len, struct ll_fid *cfid,
571                                   ldlm_iterator_t it, void *data)
572 {
573         struct obd_device *obd = exp->exp_obd;
574         struct lmv_obd *lmv = &obd->u.lmv;
575         struct lmv_obj *obj;
576         int rc = 0, mds;
577         ENTRY;
578         rc = lmv_check_connect(obd);
579         if (rc)
580                 RETURN(rc);
581         LASSERT(pfid->mds < lmv->desc.ld_tgt_count);
582         LASSERT(cfid->mds < lmv->desc.ld_tgt_count);
583         CDEBUG(D_OTHER, "CBDATA for %lu/%lu/%lu:%*s -> %lu/%lu/%lu\n",
584                (unsigned long) pfid->mds, (unsigned long) pfid->id,
585                (unsigned long) pfid->generation, len, name,
586                (unsigned long) cfid->mds, (unsigned long) cfid->id,
587                (unsigned long) cfid->generation);
588
589         /* this is default mds for directory name belongs to */
590         mds = pfid->mds;
591         obj = lmv_grab_obj(obd, pfid, 0);
592         if (obj) {
593                 /* directory is splitted. look for right mds for this name */
594                 mds = raw_name2idx(obj->objcount, name, len);
595                 lmv_put_obj(obj);
596         }
597         rc = md_change_cbdata(lmv->tgts[mds].ltd_exp, cfid, it, data);
598         RETURN(rc);
599 }
600
601 static int lmv_valid_attrs(struct obd_export *exp, struct ll_fid *fid) 
602 {
603         struct obd_device *obd = exp->exp_obd;
604         struct lmv_obd *lmv = &obd->u.lmv;
605         int rc = 0;
606         ENTRY;
607         rc = lmv_check_connect(obd);
608         if (rc)
609                 RETURN(rc);
610         CDEBUG(D_OTHER, "validate %lu/%lu/%lu\n",
611                (unsigned long) fid->mds,
612                (unsigned long) fid->id,
613                (unsigned long) fid->generation);
614         LASSERT(fid->mds < lmv->desc.ld_tgt_count);
615         rc = md_valid_attrs(lmv->tgts[fid->mds].ltd_exp, fid);
616         RETURN(rc);
617 }
618
619 int lmv_close(struct obd_export *exp, struct obdo *obdo,
620                   struct obd_client_handle *och,
621                   struct ptlrpc_request **request)
622 {
623         struct obd_device *obd = exp->exp_obd;
624         struct lmv_obd *lmv = &obd->u.lmv;
625         int rc, i = obdo->o_mds;
626         ENTRY;
627         rc = lmv_check_connect(obd);
628         if (rc)
629                 RETURN(rc);
630         LASSERT(i < lmv->desc.ld_tgt_count);
631         CDEBUG(D_OTHER, "CLOSE %lu/%lu/%lu\n", (unsigned long) obdo->o_mds,
632                (unsigned long) obdo->o_id, (unsigned long) obdo->o_generation);
633         rc = md_close(lmv->tgts[i].ltd_exp, obdo, och, request);
634         RETURN(rc);
635 }
636
637 int lmv_get_mea_and_update_object(struct obd_export *exp, struct ll_fid *fid)
638 {
639         struct obd_device *obd = exp->exp_obd;
640         struct lmv_obd *lmv = &obd->u.lmv;
641         struct ptlrpc_request *req = NULL;
642         struct lustre_md md;
643         unsigned long valid;
644         int mealen, rc;
645
646         md.mea = NULL;
647         mealen = MEA_SIZE_LMV(lmv);
648         
649         valid = OBD_MD_FLEASIZE | OBD_MD_FLDIREA;
650
651         /* time to update mea of parent fid */
652         rc = md_getattr(lmv->tgts[fid->mds].ltd_exp, fid,
653                         valid, mealen, &req);
654         if (rc) {
655                 CERROR("md_getattr() failed, rc = %d\n", rc);
656                 GOTO(cleanup, rc);
657         }
658
659         rc = mdc_req2lustre_md(exp, req, 0, NULL, &md);
660         if (rc) {
661                 CERROR("mdc_req2lustre_md() failed, rc = %d\n", rc);
662                 GOTO(cleanup, rc);
663         }
664
665         if (md.mea == NULL)
666                 GOTO(cleanup, rc = -ENODATA);
667
668         rc = lmv_create_obj_from_attrs(exp, fid, md.mea);
669         obd_free_memmd(exp, (struct lov_stripe_md **) &md.mea);
670
671 cleanup:
672         if (req)
673                 ptlrpc_req_finished(req);
674         RETURN(rc);
675 }
676
677 int lmv_create(struct obd_export *exp, struct mdc_op_data *op_data,
678                const void *data, int datalen, int mode, __u32 uid,
679                __u32 gid, __u64 rdev, struct ptlrpc_request **request)
680 {
681         struct obd_device *obd = exp->exp_obd;
682         struct lmv_obd *lmv = &obd->u.lmv;
683         struct mds_body *mds_body;
684         struct lmv_obj *obj;
685         int rc, mds;
686         ENTRY;
687
688         rc = lmv_check_connect(obd);
689         if (rc)
690                 RETURN(rc);
691
692         if (!lmv->desc.ld_active_tgt_count)
693                 RETURN(-EIO);
694 repeat:
695         obj = lmv_grab_obj(obd, &op_data->fid1, 0);
696         if (obj) {
697                 mds = raw_name2idx(obj->objcount, op_data->name,
698                                    op_data->namelen);
699                 op_data->fid1 = obj->objs[mds].fid;
700                 lmv_put_obj(obj);
701         }
702
703         CDEBUG(D_OTHER, "CREATE '%*s' on %lu/%lu/%lu\n",
704                         op_data->namelen, op_data->name,
705                         (unsigned long) op_data->fid1.mds,
706                         (unsigned long) op_data->fid1.id,
707                         (unsigned long) op_data->fid1.generation);
708         rc = md_create(lmv->tgts[op_data->fid1.mds].ltd_exp, op_data, data,
709                        datalen, mode, uid, gid, rdev, request);
710         if (rc == 0) {
711                 if (*request == NULL)
712                      RETURN(rc);
713                 mds_body = lustre_msg_buf((*request)->rq_repmsg, 0,
714                                           sizeof(*mds_body));
715                 LASSERT(mds_body != NULL);
716                 CDEBUG(D_OTHER, "created. id = %lu, generation = %lu, mds = %d\n",
717                        (unsigned long) mds_body->fid1.id,
718                        (unsigned long) mds_body->fid1.generation,
719                        op_data->fid1.mds);
720                 LASSERT(mds_body->valid & OBD_MD_MDS ||
721                         mds_body->mds == op_data->fid1.mds);
722         } else if (rc == -ERESTART) {
723                 /* directory got splitted. time to update local object
724                  * and repeat the request with proper MDS */
725                 rc = lmv_get_mea_and_update_object(exp, &op_data->fid1);
726                 if (rc == 0) {
727                         ptlrpc_req_finished(*request);
728                         goto repeat;
729                 }
730         }
731         RETURN(rc);
732 }
733
734 int lmv_done_writing(struct obd_export *exp, struct obdo *obdo)
735 {
736         struct obd_device *obd = exp->exp_obd;
737         struct lmv_obd *lmv = &obd->u.lmv;
738         int rc;
739         ENTRY;
740         rc = lmv_check_connect(obd);
741         if (rc)
742                 RETURN(rc);
743
744         /* FIXME: choose right MDC here */
745         CWARN("this method isn't implemented yet\n");
746         rc = md_done_writing(lmv->tgts[0].ltd_exp, obdo);
747         RETURN(rc);
748 }
749
750 int lmv_enqueue_slaves(struct obd_export *exp, int locktype,
751                          struct lookup_intent *it, int lockmode,
752                          struct mdc_op_data *data, struct lustre_handle *lockh,
753                          void *lmm, int lmmsize,
754                          ldlm_completion_callback cb_completion,
755                          ldlm_blocking_callback cb_blocking, void *cb_data)
756 {
757         struct obd_device *obd = exp->exp_obd;
758         struct lmv_obd *lmv = &obd->u.lmv;
759         struct mea *mea = data->mea1;
760         struct mdc_op_data data2;
761         int i, rc, mds;
762         ENTRY;
763
764         LASSERT(mea != NULL);
765         for (i = 0; i < mea->mea_count; i++) {
766                 if (lmv->tgts[i].ltd_exp == NULL)
767                         continue;
768
769                 memset(&data2, 0, sizeof(data2));
770                 data2.fid1 = mea->mea_fids[i];
771                 mds = data2.fid1.mds;
772                 rc = md_enqueue(lmv->tgts[mds].ltd_exp, locktype, it, lockmode,
773                                 &data2, lockh + i, lmm, lmmsize, cb_completion,
774                                 cb_blocking, cb_data);
775                 CDEBUG(D_OTHER, "take lock on slave %lu/%lu/%lu -> %d/%d\n",
776                        (unsigned long) mea->mea_fids[i].mds,
777                        (unsigned long) mea->mea_fids[i].id,
778                        (unsigned long) mea->mea_fids[i].generation,
779                        rc, it->d.lustre.it_status);
780                 if (rc)
781                         GOTO(cleanup, rc);
782                 if (it->d.lustre.it_data) {
783                         struct ptlrpc_request *req;
784                         req = (struct ptlrpc_request *) it->d.lustre.it_data;
785                         ptlrpc_req_finished(req);
786                 }
787                 
788                 if (it->d.lustre.it_status)
789                         GOTO(cleanup, rc = it->d.lustre.it_status);
790         }
791         RETURN(0);
792         
793 cleanup:
794         /* drop all taken locks */
795         while (--i >= 0) {
796                 if (lockh[i].cookie)
797                         ldlm_lock_decref(lockh + i, lockmode);
798                 lockh[i].cookie = 0;
799         }
800         RETURN(rc);
801 }
802
803 int lmv_enqueue(struct obd_export *exp, int lock_type,
804                 struct lookup_intent *it, int lock_mode,
805                 struct mdc_op_data *data, struct lustre_handle *lockh,
806                 void *lmm, int lmmsize,
807                 ldlm_completion_callback cb_completion,
808                 ldlm_blocking_callback cb_blocking, void *cb_data)
809 {
810         struct obd_device *obd = exp->exp_obd;
811         struct lmv_obd *lmv = &obd->u.lmv;
812         struct lmv_obj *obj;
813         int rc, mds;
814         ENTRY;
815
816         rc = lmv_check_connect(obd);
817         if (rc)
818                 RETURN(rc);
819
820         if (it->it_op == IT_UNLINK) {
821                 rc = lmv_enqueue_slaves(exp, lock_type, it, lock_mode,
822                                         data, lockh, lmm, lmmsize,
823                                         cb_completion, cb_blocking, cb_data);
824                 RETURN(rc);
825         }
826
827         if (data->namelen) {
828                 obj = lmv_grab_obj(obd, &data->fid1, 0);
829                 if (obj) {
830                         /* directory is splitted. look for
831                          * right mds for this name */
832                         mds = raw_name2idx(obj->objcount, (char *)data->name,
833                                            data->namelen);
834                         data->fid1 = obj->objs[mds].fid;
835                         lmv_put_obj(obj);
836                 }
837         }
838         CDEBUG(D_OTHER, "ENQUEUE '%s' on %lu/%lu\n",
839                LL_IT2STR(it), (unsigned long) data->fid1.id,
840                (unsigned long) data->fid1.generation);
841         rc = md_enqueue(lmv->tgts[data->fid1.mds].ltd_exp, lock_type, it,
842                         lock_mode, data, lockh, lmm, lmmsize, cb_completion,
843                         cb_blocking, cb_data);
844
845         RETURN(rc);
846 }
847
848 int lmv_getattr_name(struct obd_export *exp, struct ll_fid *fid,
849                          char *filename, int namelen, unsigned long valid,
850                          unsigned int ea_size, struct ptlrpc_request **request)
851 {
852         struct obd_device *obd = exp->exp_obd;
853         struct lmv_obd *lmv = &obd->u.lmv;
854         struct ll_fid rfid = *fid;
855         int rc, mds = fid->mds;
856         struct mds_body *body;
857         struct lmv_obj *obj;
858         ENTRY;
859         rc = lmv_check_connect(obd);
860         if (rc)
861                 RETURN(rc);
862 repeat:
863         obj = lmv_grab_obj(obd, fid, 0);
864         if (obj) {
865                 /* directory is splitted. look for right mds for this name */
866                 mds = raw_name2idx(obj->objcount, filename, namelen - 1);
867                 rfid = obj->objs[mds].fid;
868                 lmv_put_obj(obj);
869         }
870         CDEBUG(D_OTHER, "getattr_name for %*s on %lu/%lu/%lu -> %lu/%lu/%lu\n",
871                namelen, filename, (unsigned long) fid->mds,
872                (unsigned long) fid->id, (unsigned long) fid->generation,
873                (unsigned long) rfid.mds, (unsigned long) rfid.id,
874                (unsigned long) rfid.generation);
875         rc = md_getattr_name(lmv->tgts[mds].ltd_exp, &rfid, filename, namelen,
876                                   valid, ea_size, request);
877         if (rc == 0) {
878                 /* this could be cross-node reference. in this case all
879                  * we have right now is mds/ino/generation triple. we'd
880                  * like to find other attributes */
881                 body = lustre_msg_buf((*request)->rq_repmsg, 0, sizeof(*body));
882                 LASSERT(body != NULL);
883                 if (body->valid & OBD_MD_MDS) {
884                         struct ptlrpc_request *req = NULL;
885                         rfid = body->fid1;
886                         CDEBUG(D_OTHER, "request attrs for %lu/%lu/%lu\n",
887                                (unsigned long) rfid.mds,
888                                (unsigned long) rfid.id,
889                                (unsigned long) rfid.generation);
890                         rc = md_getattr_name(lmv->tgts[rfid.mds].ltd_exp, &rfid,
891                                              NULL, 1, valid, ea_size, &req);
892                         ptlrpc_req_finished(*request);
893                         *request = req;
894                 }
895         } else if (rc == -ERESTART) {
896                 /* directory got splitted. time to update local object
897                  * and repeat the request with proper MDS */
898                 rc = lmv_get_mea_and_update_object(exp, &rfid);
899                 if (rc == 0) {
900                         ptlrpc_req_finished(*request);
901                         goto repeat;
902                 }
903         }
904         RETURN(rc);
905 }
906
907
908 /*
909  * llite passes fid of an target inode in data->fid1 and
910  * fid of directory in data->fid2
911  */
912 int lmv_link(struct obd_export *exp, struct mdc_op_data *data,
913              struct ptlrpc_request **request)
914 {
915         struct obd_device *obd = exp->exp_obd;
916         struct lmv_obd *lmv = &obd->u.lmv;
917         struct lmv_obj *obj;
918         int rc;
919         ENTRY;
920         rc = lmv_check_connect(obd);
921         if (rc)
922                 RETURN(rc);
923         if (data->namelen != 0) {
924                 /* usual link request */
925                 obj = lmv_grab_obj(obd, &data->fid1, 0);
926                 if (obj) {
927                         rc = raw_name2idx(obj->objcount, data->name,
928                                          data->namelen);
929                         data->fid1 = obj->objs[rc].fid;
930                         lmv_put_obj(obj);
931                 }
932                 CDEBUG(D_OTHER,"link %u/%u/%u:%*s to %u/%u/%u mds %d\n",
933                        (unsigned) data->fid2.mds, (unsigned) data->fid2.id,
934                        (unsigned) data->fid2.generation, data->namelen,
935                        data->name, (unsigned) data->fid1.mds,
936                        (unsigned) data->fid1.id,
937                        (unsigned) data->fid1.generation, data->fid1.mds);
938         } else {
939                 /* request from MDS to acquire i_links for inode by fid1 */
940                 CDEBUG(D_OTHER, "inc i_nlinks for %u/%u/%u\n",
941                        (unsigned) data->fid1.mds, (unsigned) data->fid1.id,
942                        (unsigned) data->fid1.generation);
943         }
944                         
945         rc = md_link(lmv->tgts[data->fid1.mds].ltd_exp, data, request);
946         RETURN(rc);
947 }
948
949 int lmv_rename(struct obd_export *exp, struct mdc_op_data *data,
950                const char *old, int oldlen, const char *new, int newlen,
951                struct ptlrpc_request **request)
952 {
953         struct obd_device *obd = exp->exp_obd;
954         struct lmv_obd *lmv = &obd->u.lmv;
955         struct lmv_obj *obj;
956         int rc, mds;
957         ENTRY;
958
959         CDEBUG(D_OTHER, "rename %*s in %lu/%lu/%lu to %*s in %lu/%lu/%lu\n",
960                oldlen, old, (unsigned long) data->fid1.mds,
961                (unsigned long) data->fid1.id,
962                (unsigned long) data->fid1.generation,
963                newlen, new, (unsigned long) data->fid2.mds,
964                (unsigned long) data->fid2.id,
965                (unsigned long) data->fid2.generation);
966         if (!fid_equal(&data->fid1, &data->fid2))
967                 CWARN("cross-node rename %lu/%lu/%lu:%*s to %lu/%lu/%lu:%*s\n",
968                       (unsigned long) data->fid1.mds,
969                       (unsigned long) data->fid1.id,
970                       (unsigned long) data->fid1.generation, oldlen, old,
971                       (unsigned long) data->fid2.mds,
972                       (unsigned long) data->fid2.id,
973                       (unsigned long) data->fid2.generation, newlen, new);
974
975         rc = lmv_check_connect(obd);
976         if (rc)
977                 RETURN(rc);
978
979         if (oldlen == 0) {
980                 /* MDS with old dir entry is asking another MDS
981                  * to create name there */
982                 CDEBUG(D_OTHER,
983                        "create %*s(%d/%d) in %lu/%lu/%lu pointing to %lu/%lu/%lu\n",
984                        newlen, new, oldlen, newlen,
985                        (unsigned long) data->fid2.mds,
986                        (unsigned long) data->fid2.id,
987                        (unsigned long) data->fid2.generation,
988                        (unsigned long) data->fid1.mds,
989                        (unsigned long) data->fid1.id,
990                        (unsigned long) data->fid1.generation);
991                 mds = data->fid2.mds;
992                 goto request;
993         }
994
995         obj = lmv_grab_obj(obd, &data->fid1, 0);
996         if (obj) {
997                 /* directory is already splitted, so we have to forward
998                  * request to the right MDS */
999                 mds = raw_name2idx(obj->objcount, (char *)old, oldlen);
1000                 data->fid1 = obj->objs[mds].fid;
1001                 CDEBUG(D_OTHER, "forward to MDS #%u (%lu/%lu/%lu)\n", mds,
1002                        (unsigned long) obj->objs[mds].fid.mds,
1003                        (unsigned long) obj->objs[mds].fid.id,
1004                        (unsigned long) obj->objs[mds].fid.generation);
1005         }
1006         lmv_put_obj(obj);
1007
1008         obj = lmv_grab_obj(obd, &data->fid2, 0);
1009         if (obj) {
1010                 /* directory is already splitted, so we have to forward
1011                  * request to the right MDS */
1012                 mds = raw_name2idx(obj->objcount, (char *)new, newlen);
1013                 data->fid2 = obj->objs[mds].fid;
1014                 CDEBUG(D_OTHER, "forward to MDS #%u (%lu/%lu/%lu)\n", mds,
1015                        (unsigned long) obj->objs[mds].fid.mds,
1016                        (unsigned long) obj->objs[mds].fid.id,
1017                        (unsigned long) obj->objs[mds].fid.generation);
1018         }
1019         lmv_put_obj(obj);
1020         
1021         mds = data->fid1.mds;
1022
1023 request:
1024         rc = md_rename(lmv->tgts[mds].ltd_exp, data, old, oldlen,
1025                        new, newlen, request); 
1026         RETURN(rc);
1027 }
1028
1029 int lmv_setattr(struct obd_export *exp, struct mdc_op_data *data,
1030                 struct iattr *iattr, void *ea, int ealen, void *ea2, int ea2len,
1031                 struct ptlrpc_request **request)
1032 {
1033         struct obd_device *obd = exp->exp_obd;
1034         struct lmv_obd *lmv = &obd->u.lmv;
1035         int rc = 0, i = data->fid1.mds;
1036         struct ptlrpc_request *req;
1037         struct mds_body *mds_body;
1038         struct lmv_obj *obj;
1039         ENTRY;
1040
1041         rc = lmv_check_connect(obd);
1042         if (rc)
1043                 RETURN(rc);
1044
1045         obj = lmv_grab_obj(obd, &data->fid1, 0);
1046         CDEBUG(D_OTHER, "SETATTR for %lu/%lu/%lu, valid 0x%x%s\n",
1047                (unsigned long) data->fid1.mds,
1048                (unsigned long) data->fid1.id,
1049                (unsigned long) data->fid1.generation, iattr->ia_valid,
1050                obj ? ", splitted" : "");
1051         if (obj) {
1052                 for (i = 0; i < obj->objcount; i++) {
1053                         data->fid1 = obj->objs[i].fid;
1054                         rc = md_setattr(lmv->tgts[i].ltd_exp, data, iattr, ea,
1055                                         ealen, ea2, ea2len, &req);
1056                         LASSERT(rc == 0);
1057                         if (fid_equal(&obj->fid, &obj->objs[i].fid)) {
1058                                 /* this is master object and this request
1059                                  * should be returned back to llite */
1060                                 *request = req;
1061                         } else {
1062                                 ptlrpc_req_finished(req);
1063                         }
1064                 }
1065                 lmv_put_obj(obj);
1066         } else {
1067                 LASSERT(data->fid1.mds < lmv->desc.ld_tgt_count);
1068                 rc = md_setattr(lmv->tgts[i].ltd_exp, data, iattr, ea, ealen,
1069                                 ea2, ea2len, request); 
1070                 if (rc == 0) {
1071                         mds_body = lustre_msg_buf((*request)->rq_repmsg, 0,
1072                                         sizeof(*mds_body));
1073                         LASSERT(mds_body != NULL);
1074                         LASSERT(mds_body->mds == i);
1075                 }
1076         }
1077         RETURN(rc);
1078 }
1079
1080 int lmv_sync(struct obd_export *exp, struct ll_fid *fid,
1081              struct ptlrpc_request **request)
1082 {
1083         struct obd_device *obd = exp->exp_obd;
1084         struct lmv_obd *lmv = &obd->u.lmv;
1085         int rc;
1086         ENTRY;
1087
1088         rc = lmv_check_connect(obd);
1089         if (rc)
1090                 RETURN(rc);
1091
1092         rc = md_sync(lmv->tgts[0].ltd_exp, fid, request); 
1093         RETURN(rc);
1094 }
1095
1096 int lmv_dirobj_blocking_ast(struct ldlm_lock *lock,
1097                             struct ldlm_lock_desc *desc, void *data, int flag)
1098 {
1099         struct lustre_handle lockh;
1100         struct lmv_obj *obj;
1101         int rc;
1102         ENTRY;
1103
1104         switch (flag) {
1105         case LDLM_CB_BLOCKING:
1106                 ldlm_lock2handle(lock, &lockh);
1107                 rc = ldlm_cli_cancel(&lockh);
1108                 if (rc < 0) {
1109                         CDEBUG(D_INODE, "ldlm_cli_cancel: %d\n", rc);
1110                         RETURN(rc);
1111                 }
1112                 break;
1113         case LDLM_CB_CANCELING:
1114                 /* time to drop cached attrs for dirobj */
1115                 obj = lock->l_ast_data;
1116                 if (!obj)
1117                         break;
1118
1119                 CDEBUG(D_OTHER, "cancel %s on %lu/%lu, master %lu/%lu/%lu\n",
1120                        lock->l_resource->lr_name.name[3] == 1 ?
1121                                 "LOOKUP" : "UPDATE",
1122                        (unsigned long) lock->l_resource->lr_name.name[0],
1123                        (unsigned long) lock->l_resource->lr_name.name[1],
1124                        (unsigned long) obj->fid.mds,
1125                        (unsigned long) obj->fid.id,
1126                        (unsigned long) obj->fid.generation);
1127                 break;
1128         default:
1129                 LBUG();
1130         }
1131         RETURN(0);
1132 }
1133
1134 void lmv_remove_dots(struct page *page)
1135 {
1136         char *kaddr = page_address(page);
1137         unsigned limit = PAGE_CACHE_SIZE;
1138         unsigned offs, rec_len;
1139         struct ext2_dir_entry_2 *p;
1140
1141         for (offs = 0; offs <= limit - EXT2_DIR_REC_LEN(1); offs += rec_len) {
1142                 p = (struct ext2_dir_entry_2 *)(kaddr + offs);
1143                 rec_len = le16_to_cpu(p->rec_len);
1144
1145                 if ((p->name_len == 1 && p->name[0] == '.') ||
1146                     (p->name_len == 2 && p->name[0] == '.' && p->name[1] == '.'))
1147                         p->inode = 0;
1148         }
1149 }
1150
1151 int lmv_readpage(struct obd_export *exp, struct ll_fid *mdc_fid,
1152                  __u64 offset, struct page *page,
1153                  struct ptlrpc_request **request)
1154 {
1155         struct obd_device *obd = exp->exp_obd;
1156         struct lmv_obd *lmv = &obd->u.lmv;
1157         struct ll_fid rfid = *mdc_fid;
1158         struct lmv_obj *obj;
1159         int rc, i;
1160         ENTRY;
1161
1162         rc = lmv_check_connect(obd);
1163         if (rc)
1164                 RETURN(rc);
1165
1166         LASSERT(mdc_fid->mds < lmv->desc.ld_tgt_count);
1167         CDEBUG(D_OTHER, "READPAGE at %llu from %lu/%lu/%lu\n",
1168                offset, (unsigned long) rfid.mds,
1169                (unsigned long) rfid.id,
1170                (unsigned long) rfid.generation);
1171
1172         obj = lmv_grab_obj(obd, mdc_fid, 0);
1173         if (obj) {
1174                 /* find dirobj containing page with requested offset */
1175                 /* FIXME: what about protecting cached attrs here? */
1176                 for (i = 0; i < obj->objcount; i++) {
1177                         if (offset < obj->objs[i].size)
1178                                 break;
1179                         offset -= obj->objs[i].size;
1180                 }
1181                 rfid = obj->objs[i].fid;
1182                 CDEBUG(D_OTHER, "forward to %lu/%lu/%lu with offset %lu\n",
1183                        (unsigned long) rfid.mds,
1184                        (unsigned long) rfid.id,
1185                        (unsigned long) rfid.generation,
1186                        (unsigned long) offset);
1187         }
1188         rc = md_readpage(lmv->tgts[rfid.mds].ltd_exp, &rfid, offset, page, request);
1189         if (rc == 0 && !fid_equal(&rfid, mdc_fid)) {
1190                 /* this page isn't from master object. to avoid
1191                  * ./.. duplication in directory, we have to remove them
1192                  * from all slave objects */
1193                 lmv_remove_dots(page);
1194         }
1195       
1196         lmv_put_obj(obj);
1197
1198         RETURN(rc);
1199 }
1200
1201 int lmv_unlink_slaves(struct obd_export *exp, struct mdc_op_data *data,
1202                       struct ptlrpc_request **req)
1203 {
1204         struct obd_device *obd = exp->exp_obd;
1205         struct lmv_obd *lmv = &obd->u.lmv;
1206         struct mea *mea = data->mea1;
1207         struct mdc_op_data data2;
1208         int i, rc = 0, mds;
1209         ENTRY;
1210
1211         LASSERT(mea != NULL);
1212         for (i = 0; i < mea->mea_count; i++) {
1213                 if (lmv->tgts[i].ltd_exp == NULL)
1214                         continue;
1215
1216                 memset(&data2, 0, sizeof(data2));
1217                 data2.fid1 = mea->mea_fids[i];
1218                 data2.create_mode = MDS_MODE_DONT_LOCK | S_IFDIR;
1219                 mds = data2.fid1.mds;
1220                 rc = md_unlink(lmv->tgts[mds].ltd_exp, &data2, req);
1221                 CDEBUG(D_OTHER, "unlink slave %lu/%lu/%lu -> %d\n",
1222                        (unsigned long) mea->mea_fids[i].mds,
1223                        (unsigned long) mea->mea_fids[i].id,
1224                        (unsigned long) mea->mea_fids[i].generation, rc);
1225                 if (*req) {
1226                         ptlrpc_req_finished(*req);
1227                         *req = NULL;
1228                 }
1229                 if (rc)
1230                         break;
1231         }
1232         RETURN(rc);
1233 }
1234
1235 int lmv_unlink(struct obd_export *exp, struct mdc_op_data *data,
1236                struct ptlrpc_request **request)
1237 {
1238         struct obd_device *obd = exp->exp_obd;
1239         struct lmv_obd *lmv = &obd->u.lmv;
1240         int rc, i = 0;
1241         ENTRY;
1242         rc = lmv_check_connect(obd);
1243         if (rc)
1244                 RETURN(rc);
1245
1246         if (data->namelen == 0 && data->mea1 != NULL) {
1247                 /* mds asks to remove slave objects */
1248                 rc = lmv_unlink_slaves(exp, data, request);
1249                 RETURN(rc);
1250         } else if (data->namelen != 0) {
1251                 struct lmv_obj *obj;
1252                 obj = lmv_grab_obj(obd, &data->fid1, 0);
1253                 if (obj) {
1254                         i = raw_name2idx(obj->objcount, data->name,
1255                                          data->namelen);
1256                         data->fid1 = obj->objs[i].fid;
1257                         lmv_put_obj(obj);
1258                 }
1259                 CDEBUG(D_OTHER, "unlink '%*s' in %lu/%lu/%lu -> %u\n",
1260                        data->namelen, data->name,
1261                        (unsigned long) data->fid1.mds,
1262                        (unsigned long) data->fid1.id,
1263                        (unsigned long) data->fid1.generation, i);
1264         } else {
1265                 CDEBUG(D_OTHER, "drop i_nlink on %lu/%lu/%lu\n",
1266                        (unsigned long) data->fid1.mds,
1267                        (unsigned long) data->fid1.id,
1268                        (unsigned long) data->fid1.generation);
1269         }
1270         rc = md_unlink(lmv->tgts[data->fid1.mds].ltd_exp, data, request); 
1271         RETURN(rc);
1272 }
1273
1274 struct obd_device *lmv_get_real_obd(struct obd_export *exp,
1275                                     char *name, int len)
1276 {
1277         struct obd_device *obd = exp->exp_obd;
1278         struct lmv_obd *lmv = &obd->u.lmv;
1279         int rc;
1280         ENTRY;
1281
1282         rc = lmv_check_connect(obd);
1283         if (rc)
1284                 RETURN(ERR_PTR(rc));
1285         obd = lmv->tgts[0].ltd_exp->exp_obd;
1286         EXIT;
1287         return obd;
1288 }
1289
1290 int lmv_init_ea_size(struct obd_export *exp, int easize, int cookiesize)
1291 {
1292         struct obd_device *obd = exp->exp_obd;
1293         struct lmv_obd *lmv = &obd->u.lmv;
1294         int i, rc = 0, change = 0;
1295         ENTRY;
1296
1297         if (lmv->max_easize < easize) {
1298                 lmv->max_easize = easize;
1299                 change = 1;
1300         }
1301         if (lmv->max_cookiesize < cookiesize) {
1302                 lmv->max_cookiesize = cookiesize;
1303                 change = 1;
1304         }
1305         if (change == 0)
1306                 RETURN(0);
1307         
1308         if (lmv->connected == 0)
1309                 RETURN(0);
1310
1311         /* FIXME: error handling? */
1312         for (i = 0; i < lmv->desc.ld_tgt_count; i++)
1313                 rc = obd_init_ea_size(lmv->tgts[i].ltd_exp, easize, cookiesize);
1314         RETURN(rc);
1315 }
1316
1317 int lmv_obd_create_single(struct obd_export *exp, struct obdo *oa,
1318                           struct lov_stripe_md **ea, struct obd_trans_info *oti)
1319 {
1320         struct obd_device *obd = exp->exp_obd;
1321         struct lmv_obd *lmv = &obd->u.lmv;
1322         struct lov_stripe_md obj_md;
1323         struct lov_stripe_md *obj_mdp = &obj_md;
1324         int rc = 0;
1325         ENTRY;
1326
1327         rc = lmv_check_connect(obd);
1328         if (rc)
1329                 RETURN(rc);
1330
1331         LASSERT(ea == NULL);
1332         LASSERT(oa->o_mds < lmv->desc.ld_tgt_count);
1333
1334         rc = obd_create(lmv->tgts[oa->o_mds].ltd_exp, oa, &obj_mdp, oti);
1335         LASSERT(rc == 0);
1336
1337         RETURN(rc);
1338 }
1339
1340 /*
1341  * to be called from MDS only
1342  */
1343 int lmv_obd_create(struct obd_export *exp, struct obdo *oa,
1344                    struct lov_stripe_md **ea, struct obd_trans_info *oti)
1345 {
1346         struct obd_device *obd = exp->exp_obd;
1347         struct lmv_obd *lmv = &obd->u.lmv;
1348         struct mea *mea;
1349         int i, c, rc = 0;
1350         struct ll_fid mfid;
1351         ENTRY;
1352
1353         rc = lmv_check_connect(obd);
1354         if (rc)
1355                 RETURN(rc);
1356
1357         LASSERT(oa != NULL);
1358         
1359         if (ea == NULL) {
1360                 rc = lmv_obd_create_single(exp, oa, NULL, oti);
1361                 RETURN(rc);
1362         }
1363
1364         if (*ea == NULL) {
1365                 rc = obd_alloc_diskmd(exp, (struct lov_mds_md **)ea);
1366                 LASSERT(*ea != NULL);
1367         }
1368
1369         mea = (struct mea *)*ea;
1370         mfid.id = oa->o_id;
1371         mfid.generation = oa->o_generation;
1372         rc = 0;
1373         if (!mea->mea_count || mea->mea_count > lmv->desc.ld_tgt_count)
1374                 mea->mea_count = lmv->desc.ld_tgt_count;
1375
1376         mea->mea_master = -1;
1377         
1378         /* FIXME: error handling? */
1379         for (i = 0, c = 0; c < mea->mea_count && 
1380                 i < lmv->desc.ld_tgt_count; i++) {
1381                 struct lov_stripe_md obj_md;
1382                 struct lov_stripe_md *obj_mdp = &obj_md;
1383                
1384                 if (lmv->tgts[i].ltd_exp == NULL) {
1385                         /* this is master MDS */
1386                         mea->mea_fids[c].id = mfid.id;
1387                         mea->mea_fids[c].generation = mfid.generation;
1388                         mea->mea_fids[c].mds = i;
1389                         mea->mea_master = i;
1390                         c++;
1391                         continue;
1392                 }
1393
1394                 /* "Master" MDS should always be part of stripped dir, so
1395                    scan for it */
1396                 if (mea->mea_master == -1 && c == mea->mea_count - 1)
1397                         continue;
1398
1399                 oa->o_valid = OBD_MD_FLGENER | OBD_MD_FLTYPE | OBD_MD_FLMODE
1400                                 | OBD_MD_FLUID | OBD_MD_FLGID | OBD_MD_FLID;
1401
1402                 rc = obd_create(lmv->tgts[c].ltd_exp, oa, &obj_mdp, oti);
1403                 /* FIXME: error handling here */
1404                 LASSERT(rc == 0);
1405
1406                 mea->mea_fids[c].id = oa->o_id;
1407                 mea->mea_fids[c].generation = oa->o_generation;
1408                 mea->mea_fids[c].mds = i;
1409                 c++;
1410                 CDEBUG(D_OTHER, "dirobj at mds %d: "LPU64"/%u\n",
1411                        i, oa->o_id, oa->o_generation);
1412         }
1413         LASSERT(c == mea->mea_count);
1414         CDEBUG(D_OTHER, "%d dirobjects created\n", (int) mea->mea_count);
1415
1416         RETURN(rc);
1417 }
1418
1419 static int lmv_get_info(struct obd_export *exp, __u32 keylen,
1420                         void *key, __u32 *vallen, void *val)
1421 {
1422         struct obd_device *obd;
1423         struct lmv_obd *lmv;
1424         ENTRY;
1425
1426         obd = class_exp2obd(exp);
1427         if (obd == NULL) {
1428                 CDEBUG(D_IOCTL, "invalid client cookie "LPX64"\n",
1429                        exp->exp_handle.h_cookie);
1430                 RETURN(-EINVAL);
1431         }
1432
1433         lmv = &obd->u.lmv;
1434         if (keylen == 6 && memcmp(key, "mdsize", 6) == 0) {
1435                 __u32 *mdsize = val;
1436                 *vallen = sizeof(__u32);
1437                 *mdsize = sizeof(struct ll_fid) * lmv->desc.ld_tgt_count
1438                                 + sizeof(struct mea);
1439                 RETURN(0);
1440         } else if (keylen == 6 && memcmp(key, "mdsnum", 6) == 0) {
1441                 struct obd_uuid *cluuid = &lmv->cluuid;
1442                 struct lmv_tgt_desc *tgts;
1443                 __u32 *mdsnum = val;
1444                 int i;
1445
1446                 for (i = 0, tgts = lmv->tgts; i < lmv->desc.ld_tgt_count; i++, tgts++) {
1447                         if (obd_uuid_equals(&tgts->uuid, cluuid)) {
1448                                 *vallen = sizeof(__u32);
1449                                 *mdsnum = i;
1450                                 RETURN(0);
1451                         }
1452                 }
1453                 LASSERT(0);
1454         }
1455
1456         CDEBUG(D_IOCTL, "invalid key\n");
1457         RETURN(-EINVAL);
1458 }
1459
1460 int lmv_set_info(struct obd_export *exp, obd_count keylen,
1461                  void *key, obd_count vallen, void *val)
1462 {
1463         struct obd_device *obd;
1464         struct lmv_obd *lmv;
1465         ENTRY;
1466
1467         obd = class_exp2obd(exp);
1468         if (obd == NULL) {
1469                 CDEBUG(D_IOCTL, "invalid client cookie "LPX64"\n",
1470                        exp->exp_handle.h_cookie);
1471                 RETURN(-EINVAL);
1472         }
1473         lmv = &obd->u.lmv;
1474
1475         if (keylen >= strlen("client") && strcmp(key, "client") == 0) {
1476                 struct lmv_tgt_desc *tgts;
1477                 int i, rc;
1478
1479                 rc = lmv_check_connect(obd);
1480                 if (rc)
1481                         RETURN(rc);
1482
1483                 for (i = 0, tgts = lmv->tgts; 
1484                         i < lmv->desc.ld_tgt_count; i++, tgts++) {
1485                         rc = obd_set_info(tgts->ltd_exp, keylen, key, vallen, val);
1486                         if (rc)
1487                                 RETURN(rc);
1488                 }
1489                 RETURN(0);
1490         } else if (keylen >= strlen("inter_mds") && strcmp(key, "inter_mds") == 0) {
1491                 lmv->server_timeout = 1;
1492                 lmv_set_timeouts(obd);
1493                 RETURN(0);
1494         }
1495         
1496         RETURN(-EINVAL);
1497 }
1498
1499 int lmv_packmd(struct obd_export *exp, struct lov_mds_md **lmmp,
1500                struct lov_stripe_md *lsm)
1501 {
1502         struct obd_device *obd = class_exp2obd(exp);
1503         struct lmv_obd *lmv = &obd->u.lmv;
1504         int mea_size;
1505         ENTRY;
1506
1507         mea_size = sizeof(struct ll_fid) * 
1508                 lmv->desc.ld_tgt_count + sizeof(struct mea);
1509         if (!lmmp)
1510                 RETURN(mea_size);
1511
1512         if (*lmmp && !lsm) {
1513                 OBD_FREE(*lmmp, mea_size);
1514                 *lmmp = NULL;
1515                 RETURN(0);
1516         }
1517
1518         if (!*lmmp) {
1519                 OBD_ALLOC(*lmmp, mea_size);
1520                 if (!*lmmp)
1521                         RETURN(-ENOMEM);
1522         }
1523
1524         if (!lsm)
1525                 RETURN(mea_size);
1526
1527 #warning "MEA packing/convertation must be here! -bzzz"
1528         memcpy(*lmmp, lsm, mea_size);
1529         RETURN(mea_size);
1530 }
1531
1532 int lmv_unpackmd(struct obd_export *exp, struct lov_stripe_md **mem_tgt,
1533                         struct lov_mds_md *disk_src, int mdsize)
1534 {
1535         struct obd_device *obd = class_exp2obd(exp);
1536         struct lmv_obd *lmv = &obd->u.lmv;
1537         struct mea **tmea = (struct mea **) mem_tgt;
1538         struct mea *mea = (void *) disk_src;
1539         int mea_size;
1540         ENTRY;
1541
1542         mea_size = sizeof(struct ll_fid) * 
1543                 lmv->desc.ld_tgt_count + sizeof(struct mea);
1544         if (mem_tgt == NULL)
1545                 return mea_size;
1546
1547         if (*mem_tgt != NULL && disk_src == NULL) {
1548                 OBD_FREE(*tmea, mea_size);
1549                 RETURN(0);
1550         }
1551
1552         LASSERT(mea_size == mdsize);
1553
1554         OBD_ALLOC(*tmea, mea_size);
1555         /* FIXME: error handling here */
1556         LASSERT(*tmea != NULL);
1557
1558         if (!disk_src)
1559                 RETURN(mea_size);
1560
1561 #warning "MEA unpacking/convertation must be here! -bzzz"
1562         memcpy(*tmea, mea, mdsize);
1563         RETURN(mea_size);
1564 }
1565
1566 int lmv_brw(int rw, struct obd_export *exp, struct obdo *oa,
1567                 struct lov_stripe_md *ea, obd_count oa_bufs,
1568                 struct brw_page *pgarr, struct obd_trans_info *oti)
1569 {
1570         struct obd_device *obd = exp->exp_obd;
1571         struct lmv_obd *lmv = &obd->u.lmv;
1572         struct mea *mea = (struct mea *) ea;
1573         int err;
1574       
1575         LASSERT(oa != NULL);
1576         LASSERT(ea != NULL);
1577         LASSERT(pgarr != NULL);
1578         LASSERT(oa->o_mds < lmv->desc.ld_tgt_count);
1579
1580         oa->o_gr = mea->mea_fids[oa->o_mds].generation;
1581         oa->o_id = mea->mea_fids[oa->o_mds].id;
1582         oa->o_valid =  OBD_MD_FLID | OBD_MD_FLGROUP;
1583         err = obd_brw(rw, lmv->tgts[oa->o_mds].ltd_exp, oa,
1584                       NULL, oa_bufs, pgarr, oti);
1585         RETURN(err);
1586 }
1587
1588 struct obd_ops lmv_obd_ops = {
1589         .o_owner                = THIS_MODULE,
1590         .o_attach               = lmv_attach,
1591         .o_detach               = lmv_detach,
1592         .o_setup                = lmv_setup,
1593         .o_cleanup              = lmv_cleanup,
1594         .o_connect              = lmv_connect,
1595         .o_disconnect           = lmv_disconnect,
1596         .o_statfs               = lmv_statfs,
1597         .o_get_info             = lmv_get_info,
1598         .o_set_info             = lmv_set_info,
1599         .o_create               = lmv_obd_create,
1600         .o_packmd               = lmv_packmd,
1601         .o_unpackmd             = lmv_unpackmd,
1602         .o_brw                  = lmv_brw,
1603         .o_init_ea_size         = lmv_init_ea_size,
1604         .o_notify               = lmv_notify,
1605         .o_iocontrol            = lmv_iocontrol,
1606 };
1607
1608 struct md_ops lmv_md_ops = {
1609         .m_getstatus            = lmv_getstatus,
1610         .m_getattr              = lmv_getattr,
1611         .m_change_cbdata        = lmv_change_cbdata,
1612         .m_change_cbdata_name   = lmv_change_cbdata_name,
1613         .m_close                = lmv_close,
1614         .m_create               = lmv_create,
1615         .m_done_writing         = lmv_done_writing,
1616         .m_enqueue              = lmv_enqueue,
1617         .m_getattr_name         = lmv_getattr_name,
1618         .m_intent_lock          = lmv_intent_lock,
1619         .m_link                 = lmv_link,
1620         .m_rename               = lmv_rename,
1621         .m_setattr              = lmv_setattr,
1622         .m_sync                 = lmv_sync,
1623         .m_readpage             = lmv_readpage,
1624         .m_unlink               = lmv_unlink,
1625         .m_get_real_obd         = lmv_get_real_obd,
1626         .m_valid_attrs          = lmv_valid_attrs,
1627 };
1628
1629 int __init lmv_init(void)
1630 {
1631         struct lprocfs_static_vars lvars;
1632         int rc;
1633
1634         lprocfs_init_vars(lmv, &lvars);
1635         rc = class_register_type(&lmv_obd_ops, &lmv_md_ops,
1636                                  lvars.module_vars, OBD_LMV_DEVICENAME);
1637         RETURN(rc);
1638 }
1639
1640 #ifdef __KERNEL__
1641 static void lmv_exit(void)
1642 {
1643         class_unregister_type(OBD_LMV_DEVICENAME);
1644 }
1645
1646 MODULE_AUTHOR("Cluster File Systems, Inc. <info@clusterfs.com>");
1647 MODULE_DESCRIPTION("Lustre Logical Metadata Volume OBD driver");
1648 MODULE_LICENSE("GPL");
1649
1650 module_init(lmv_init);
1651 module_exit(lmv_exit);
1652 #endif