Whamcloud - gitweb
Merge b_md to HEAD for 0.5.19 release.
[fs/lustre-release.git] / lustre / lov / lov_obd.c
1  /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2  * vim:expandtab:shiftwidth=8:tabstop=8:
3  *
4  *  lov/lov.c
5  *
6  * Copyright (C) 2002 Cluster File Systems, Inc.
7  * Author: Phil Schwan <phil@off.net>
8  *         Peter Braam <braam@clusterfs.com>
9  *         Mike Shaver <shaver@off.net>
10  *
11  * This code is issued under the GNU General Public License.
12  * See the file COPYING in this distribution
13  */
14
15 #define EXPORT_SYMTAB
16 #define DEBUG_SUBSYSTEM S_LOV
17
18 #include <linux/slab.h>
19 #include <linux/module.h>
20 #include <linux/obd_support.h>
21 #include <linux/lustre_lib.h>
22 #include <linux/lustre_net.h>
23 #include <linux/lustre_idl.h>
24 #include <linux/lustre_lite.h> /* for LL_IOC_LOV_[GS]ETSTRIPE */
25 #include <linux/lustre_mds.h>
26 #include <linux/obd_class.h>
27 #include <linux/obd_lov.h>
28 #include <linux/init.h>
29 #include <linux/random.h>
30 #include <linux/slab.h>
31 #include <asm/div64.h>
32 #include <linux/lprocfs_status.h>
33
34 extern struct lprocfs_vars status_var_nm_1[];
35 extern struct lprocfs_vars status_class_var[];
36
37 static kmem_cache_t *lov_file_cache;
38
39 struct lov_file_handles {
40         struct list_head lfh_list;
41         __u64 lfh_cookie;
42         int lfh_count;
43         struct lustre_handle *lfh_handles;
44 };
45
46 struct lov_lock_handles {
47         __u64 llh_cookie;
48         struct lustre_handle llh_handles[0];
49 };
50
51 extern int lov_packmd(struct lustre_handle *conn, struct lov_mds_md **lmm,
52                        struct lov_stripe_md *lsm);
53 extern int lov_unpackmd(struct lustre_handle *conn, struct lov_stripe_md **lsm,
54                          struct lov_mds_md *lmm);
55 extern int lov_setstripe(struct lustre_handle *conn,
56                          struct lov_stripe_md **lsmp, struct lov_mds_md *lmmu);
57 extern int lov_getstripe(struct lustre_handle *conn, struct lov_mds_md *lmmu,
58                          struct lov_stripe_md *lsm);
59
60 /* obd methods */
61 int lov_attach(struct obd_device *dev, obd_count len, void *data)
62 {
63         return lprocfs_reg_obd(dev, status_var_nm_1, dev);
64 }
65
66 int lov_detach(struct obd_device *dev)
67 {
68         return lprocfs_dereg_obd(dev);
69 }
70
71 static int lov_connect(struct lustre_handle *conn, struct obd_device *obd,
72                        obd_uuid_t cluuid, struct recovd_obd *recovd,
73                        ptlrpc_recovery_cb_t recover)
74 {
75         struct ptlrpc_request *req = NULL;
76         struct lov_obd *lov = &obd->u.lov;
77         struct client_obd *mdc = &lov->mdcobd->u.cli;
78         struct lov_desc *desc = &lov->desc;
79         struct obd_export *exp;
80         struct lustre_handle mdc_conn;
81         obd_uuid_t *uuidarray;
82         int rc, rc2, i;
83         ENTRY;
84
85         rc = class_connect(conn, obd, cluuid);
86         if (rc)
87                 RETURN(rc);
88
89         /* We don't want to actually do the underlying connections more than
90          * once, so keep track. */
91         lov->refcount++;
92         if (lov->refcount > 1)
93                 RETURN(0);
94
95         exp = class_conn2export(conn);
96         spin_lock_init(&exp->exp_lov_data.led_lock);
97         INIT_LIST_HEAD(&exp->exp_lov_data.led_open_head);
98
99         /* retrieve LOV metadata from MDS */
100         rc = obd_connect(&mdc_conn, lov->mdcobd, NULL, recovd, recover);
101         if (rc) {
102                 CERROR("cannot connect to mdc: rc = %d\n", rc);
103                 GOTO(out_conn, rc);
104         }
105
106         rc = mdc_getlovinfo(obd, &mdc_conn, &req);
107         rc2 = obd_disconnect(&mdc_conn);
108         if (rc) {
109                 CERROR("cannot get lov info %d\n", rc);
110                 GOTO(out_conn, rc);
111         }
112
113         if (rc2) {
114                 CERROR("error disconnecting from MDS %d\n", rc2);
115                 GOTO(out_conn, rc = rc2);
116         }
117
118         /* sanity... */
119         if (req->rq_repmsg->bufcount < 2 ||
120             req->rq_repmsg->buflens[0] < sizeof(*desc)) {
121                 CERROR("LOV desc: invalid descriptor returned\n");
122                 GOTO(out_conn, rc = -EINVAL);
123         }
124
125         memcpy(desc, lustre_msg_buf(req->rq_repmsg, 0), sizeof(*desc));
126         lov_unpackdesc(desc);
127
128         if (req->rq_repmsg->buflens[1] < sizeof(*uuidarray)*desc->ld_tgt_count){
129                 CERROR("LOV desc: invalid uuid array returned\n");
130                 GOTO(out_conn, rc = -EINVAL);
131         }
132
133         if (memcmp(obd->obd_uuid, desc->ld_uuid, sizeof(desc->ld_uuid))) {
134                 CERROR("LOV desc: uuid %s not on mds device (%s)\n",
135                        obd->obd_uuid, desc->ld_uuid);
136                 GOTO(out_conn, rc = -EINVAL);
137         }
138
139         if (desc->ld_tgt_count > 1000) {
140                 CERROR("LOV desc: target count > 1000 (%d)\n",
141                        desc->ld_tgt_count);
142                 GOTO(out_conn, rc = -EINVAL);
143         }
144
145         /* Because of 64-bit divide/mod operations only work with a 32-bit
146          * divisor in a 32-bit kernel, we cannot support a stripe width
147          * of 4GB or larger on 32-bit CPUs.
148          */
149         if ((desc->ld_default_stripe_count ?
150              desc->ld_default_stripe_count : desc->ld_tgt_count) *
151              desc->ld_default_stripe_size > ~0UL) {
152                 CERROR("LOV: stripe width "LPU64"x%u > %lu on 32-bit system\n",
153                        desc->ld_default_stripe_size,
154                        desc->ld_default_stripe_count ?
155                        desc->ld_default_stripe_count : desc->ld_tgt_count,~0UL);
156                 GOTO(out_conn, rc = -EINVAL);
157         }
158
159         lov->bufsize = sizeof(struct lov_tgt_desc) * desc->ld_tgt_count;
160         OBD_ALLOC(lov->tgts, lov->bufsize);
161         if (!lov->tgts) {
162                 CERROR("Out of memory\n");
163                 GOTO(out_conn, rc = -ENOMEM);
164         }
165
166         uuidarray = lustre_msg_buf(req->rq_repmsg, 1);
167         for (i = 0; i < desc->ld_tgt_count; i++)
168                 memcpy(lov->tgts[i].uuid, uuidarray[i], sizeof(*uuidarray));
169
170         for (i = 0; i < desc->ld_tgt_count; i++) {
171                 struct obd_device *tgt = client_tgtuuid2obd(uuidarray[i]);
172
173                 if (!tgt) {
174                         CERROR("Target %s not attached\n", uuidarray[i]);
175                         GOTO(out_disc, rc = -EINVAL);
176                 }
177
178                 if (!(tgt->obd_flags & OBD_SET_UP)) {
179                         CERROR("Target %s not set up\n", uuidarray[i]);
180                         GOTO(out_disc, rc = -EINVAL);
181                 }
182
183                 rc = obd_connect(&lov->tgts[i].conn, tgt, NULL, recovd,
184                                  recover);
185
186                 if (rc) {
187                         CERROR("Target %s connect error %d\n", uuidarray[i],
188                                rc);
189                         GOTO(out_disc, rc);
190                 }
191                         
192                 rc = obd_iocontrol(IOC_OSC_REGISTER_LOV, &lov->tgts[i].conn,
193                                     sizeof(struct obd_device *), obd, NULL);
194                 if (rc) {
195                         CERROR("Target %s REGISTER_LOV error %d\n",
196                                uuidarray[i], rc);
197                         GOTO(out_disc, rc);
198                 }
199
200                 desc->ld_active_tgt_count++;
201                 lov->tgts[i].active = 1;
202         }
203
204         mdc->cl_max_mds_easize = obd_size_wiremd(conn, NULL);
205
206  out:
207         ptlrpc_req_finished(req);
208         RETURN(rc);
209
210  out_disc:
211         i--; /* skip failed-connect OSC */
212         while (i-- > 0) {
213                 desc->ld_active_tgt_count--;
214                 lov->tgts[i].active = 0;
215                 rc2 = obd_disconnect(&lov->tgts[i].conn);
216                 if (rc2)
217                         CERROR("LOV Target %s disconnect error: rc = %d\n",
218                                 uuidarray[i], rc2);
219         }
220         OBD_FREE(lov->tgts, lov->bufsize);
221  out_conn:
222         class_disconnect(conn);
223         goto out;
224 }
225
226 static int lov_disconnect(struct lustre_handle *conn)
227 {
228         struct obd_device *obd = class_conn2obd(conn);
229         struct lov_obd *lov = &obd->u.lov;
230         struct obd_export *exp;
231         struct list_head *p, *n;
232         int rc, i;
233
234         if (!lov->tgts)
235                 goto out_local;
236
237         /* Only disconnect the underlying layers on the final disconnect. */
238         lov->refcount--;
239         if (lov->refcount != 0)
240                 goto out_local;
241
242         for (i = 0; i < lov->desc.ld_tgt_count; i++) {
243                 rc = obd_disconnect(&lov->tgts[i].conn);
244                 if (rc) {
245                         if (lov->tgts[i].active) {
246                                 CERROR("Target %s disconnect error %d\n",
247                                        lov->tgts[i].uuid, rc);
248                         }
249                         rc = 0;
250                 }
251                 if (lov->tgts[i].active) {
252                         lov->desc.ld_active_tgt_count--;
253                         lov->tgts[i].active = 0;
254                 }
255         }
256         OBD_FREE(lov->tgts, lov->bufsize);
257         lov->bufsize = 0;
258         lov->tgts = NULL;
259
260         exp = class_conn2export(conn);
261         spin_lock(&exp->exp_lov_data.led_lock);
262         list_for_each_safe(p, n, &exp->exp_lov_data.led_open_head) {
263                 /* XXX close these, instead of just discarding them? */
264                 struct lov_file_handles *lfh;
265                 lfh = list_entry(p, typeof(*lfh), lfh_list);
266                 CERROR("discarding open LOV handle %p:"LPX64"\n",
267                        lfh, lfh->lfh_cookie);
268                 list_del(&lfh->lfh_list);
269                 OBD_FREE(lfh->lfh_handles,
270                          lfh->lfh_count * sizeof(*lfh->lfh_handles));
271                 kmem_cache_free(lov_file_cache, lfh);
272         }
273         spin_unlock(&exp->exp_lov_data.led_lock);
274
275  out_local:
276         rc = class_disconnect(conn);
277         return rc;
278 }
279
280 /* Error codes:
281  *
282  *  -EINVAL  : UUID can't be found in the LOV's target list
283  *  -ENOTCONN: The UUID is found, but the target connection is bad (!)
284  *  -EBADF   : The UUID is found, but the OBD is the wrong type (!)
285  *  -EALREADY: The OSC is already marked (in)active
286  */
287 static int lov_set_osc_active(struct lov_obd *lov, obd_uuid_t uuid,
288                               int activate)
289 {
290         struct obd_device *obd;
291         struct lov_tgt_desc *tgt;
292         int i, rc = 0;
293         ENTRY;
294
295         CDEBUG(D_INFO, "Searching in lov %p for uuid %s (activate=%d)\n",
296                lov, uuid, activate);
297
298         spin_lock(&lov->lov_lock);
299         for (i = 0, tgt = lov->tgts; i < lov->desc.ld_tgt_count; i++, tgt++) {
300                 CDEBUG(D_INFO, "lov idx %d is %s conn "LPX64"\n",
301                        i, tgt->uuid, tgt->conn.addr);
302                 if (strncmp(uuid, tgt->uuid, sizeof(tgt->uuid)) == 0)
303                         break;
304         }
305
306         if (i == lov->desc.ld_tgt_count)
307                 GOTO(out, rc = -EINVAL);
308
309         obd = class_conn2obd(&tgt->conn);
310         if (obd == NULL) {
311                 LBUG();
312                 GOTO(out, rc = -ENOTCONN);
313         }
314
315         CDEBUG(D_INFO, "Found OBD %s=%s device %d (%p) type %s at LOV idx %d\n",
316                obd->obd_name, obd->obd_uuid, obd->obd_minor, obd,
317                obd->obd_type->typ_name, i);
318         if (strcmp(obd->obd_type->typ_name, "osc") != 0) {
319                 LBUG();
320                 GOTO(out, rc = -EBADF);
321         }
322
323         if (tgt->active == activate) {
324                 CDEBUG(D_INFO, "OBD %p already %sactive!\n", obd,
325                        activate ? "" : "in");
326                 GOTO(out, rc = -EALREADY);
327         }
328
329         CDEBUG(D_INFO, "Marking OBD %p %sactive\n", obd, activate ? "" : "in");
330
331         tgt->active = activate;
332         if (activate) {
333                 /*
334                  * foreach(export)
335                  *     foreach(open_file)
336                  *         if (file_handle uses this_osc)
337                  *             if (has_no_filehandle)
338                  *                 open(file_handle, this_osc);
339                  */
340                 /* XXX reconnect? */
341                 lov->desc.ld_active_tgt_count++;
342         } else {
343                 /*
344                  * Should I invalidate filehandles that refer to this OSC, so
345                  * that I reopen them during reactivation?
346                  */
347                 /* XXX disconnect from OSC? */
348                 lov->desc.ld_active_tgt_count--;
349         }
350
351 #warning "FIXME: walk open files list for objects that need opening"
352         EXIT;
353  out:
354         spin_unlock(&lov->lov_lock);
355         return rc;
356 }
357
358 static int lov_setup(struct obd_device *obd, obd_count len, void *buf)
359 {
360         struct obd_ioctl_data *data = buf;
361         struct lov_obd *lov = &obd->u.lov;
362         int rc = 0;
363         ENTRY;
364
365         if (data->ioc_inllen1 < 1) {
366                 CERROR("LOV setup requires an MDC UUID\n");
367                 RETURN(-EINVAL);
368         }
369
370         if (data->ioc_inllen1 > 37) {
371                 CERROR("mdc UUID must be 36 characters or less\n");
372                 RETURN(-EINVAL);
373         }
374
375         spin_lock_init(&lov->lov_lock);
376         lov->mdcobd = class_uuid2obd(data->ioc_inlbuf1);
377         if (!lov->mdcobd) {
378                 CERROR("LOV %s cannot locate MDC %s\n", obd->obd_uuid,
379                        data->ioc_inlbuf1);
380                 rc = -EINVAL;
381         }
382         RETURN(rc);
383 }
384
385 static struct lov_file_handles *lov_handle2lfh(struct lustre_handle *handle)
386 {
387         struct lov_file_handles *lfh = NULL;
388
389         if (!handle || !handle->addr)
390                 RETURN(NULL);
391
392         lfh = (struct lov_file_handles *)(unsigned long)(handle->addr);
393         if (!kmem_cache_validate(lov_file_cache, lfh))
394                 RETURN(NULL);
395
396         if (lfh->lfh_cookie != handle->cookie)
397                 RETURN(NULL);
398
399         return lfh;
400 }
401
402 /* the LOV expects oa->o_id to be set to the LOV object id */
403 static int lov_create(struct lustre_handle *conn, struct obdo *oa,
404                       struct lov_stripe_md **ea)
405 {
406         struct obd_export *export = class_conn2export(conn);
407         struct lov_obd *lov;
408         struct lov_stripe_md *lsm;
409         struct lov_oinfo *loi;
410         struct obdo *tmp;
411         int ost_count, ost_idx;
412         int first = 1, obj_alloc = 0;
413         int rc = 0, i;
414         ENTRY;
415
416         LASSERT(ea);
417
418         if (!export)
419                 RETURN(-EINVAL);
420
421         lov = &export->exp_obd->u.lov;
422
423         if (!lov->desc.ld_active_tgt_count)
424                 RETURN(-EIO);
425
426         tmp = obdo_alloc();
427         if (!tmp)
428                 RETURN(-ENOMEM);
429
430         lsm = *ea;
431
432         if (!lsm) {
433                 rc = obd_alloc_memmd(conn, &lsm);
434                 if (rc < 0)
435                         GOTO(out_tmp, rc);
436
437                 rc = 0;
438                 lsm->lsm_magic = LOV_MAGIC;
439         }
440
441         ost_count = lov->desc.ld_tgt_count;
442
443         LASSERT(oa->o_valid & OBD_MD_FLID);
444         lsm->lsm_object_id = oa->o_id;
445         if (!lsm->lsm_stripe_size)
446                 lsm->lsm_stripe_size = lov->desc.ld_default_stripe_size;
447
448         if (!*ea || lsm->lsm_stripe_offset >= ost_count) {
449                 int mult = lsm->lsm_object_id * lsm->lsm_stripe_count;
450                 int stripe_offset = mult % ost_count;
451                 int sub_offset = (mult / ost_count) % lsm->lsm_stripe_count;
452
453                 ost_idx = stripe_offset + sub_offset;
454         } else
455                 ost_idx = lsm->lsm_stripe_offset;
456
457         CDEBUG(D_INODE, "allocating %d subobjs for objid "LPX64" at idx %d\n",
458                lsm->lsm_stripe_count, lsm->lsm_object_id, ost_idx);
459
460         loi = lsm->lsm_oinfo;
461         for (i = 0; i < ost_count; i++, ost_idx = (ost_idx + 1) % ost_count) {
462                 struct lov_stripe_md obj_md;
463                 struct lov_stripe_md *obj_mdp = &obj_md;
464                 int err;
465
466                 if (lov->tgts[ost_idx].active == 0) {
467                         CDEBUG(D_HA, "lov idx %d inactive\n", ost_idx);
468                         continue;
469                 }
470
471                 /* create data objects with "parent" OA */
472                 memcpy(tmp, oa, sizeof(*tmp));
473                 /* XXX: LOV STACKING: use real "obj_mdp" sub-data */
474                 err = obd_create(&lov->tgts[ost_idx].conn, tmp, &obj_mdp);
475                 if (err) {
476                         if (lov->tgts[ost_idx].active) {
477                                 CERROR("error creating objid "LPX64" sub-object"
478                                        "on OST idx %d: rc = %d\n",
479                                        oa->o_id, ost_idx, err);
480                                 if (!rc)
481                                         rc = err;
482                         }
483                         continue;
484                 }
485                 loi->loi_id = tmp->o_id;
486                 loi->loi_ost_idx = ost_idx;
487                 CDEBUG(D_INODE, "objid "LPX64" has subobj "LPX64" at idx %d\n",
488                        lsm->lsm_object_id, loi->loi_id, ost_idx);
489
490                 if (first) {
491                         lsm->lsm_stripe_offset = ost_idx;
492                         first = 0;
493                 }
494
495                 ++obj_alloc;
496                 ++loi;
497
498                 /* If we have allocated enough objects, we are OK */
499                 if (obj_alloc == lsm->lsm_stripe_count) {
500                         rc = 0;
501                         GOTO(out_done, rc);
502                 }
503         }
504
505         if (*ea)
506                 GOTO(out_cleanup, rc);
507         else {
508                 struct lov_stripe_md *lsm_new;
509                 /* XXX LOV STACKING call into osc for sizes */
510                 int size = lov_stripe_md_size(obj_alloc);
511
512                 OBD_ALLOC(lsm_new, size);
513                 if (!lsm_new)
514                         GOTO(out_cleanup, rc = -ENOMEM);
515                 memcpy(lsm_new, lsm, size);
516                 /* XXX LOV STACKING call into osc for sizes */
517                 OBD_FREE(lsm, lov_stripe_md_size(lsm->lsm_stripe_count));
518                 lsm = lsm_new;
519         }
520  out_done:
521         *ea = lsm;
522
523  out_tmp:
524         obdo_free(tmp);
525         return rc;
526
527  out_cleanup:
528         while (i-- > 0) {
529                 int err;
530
531                 --loi;
532                 /* destroy already created objects here */
533                 memcpy(tmp, oa, sizeof(*tmp));
534                 tmp->o_id = loi->loi_id;
535                 err = obd_destroy(&lov->tgts[loi->loi_ost_idx].conn, tmp, NULL);
536                 if (err)
537                         CERROR("Failed to uncreate objid "LPX64" subobj "
538                                LPX64" on OST idx %d: rc = %d\n",
539                                oa->o_id, loi->loi_id, loi->loi_ost_idx,
540                                err);
541         }
542         if (!*ea)
543                 obd_free_memmd(conn, &lsm);
544         goto out_tmp;
545 }
546
547 static int lov_destroy(struct lustre_handle *conn, struct obdo *oa,
548                        struct lov_stripe_md *lsm)
549 {
550         struct obdo tmp;
551         struct obd_export *export = class_conn2export(conn);
552         struct lov_obd *lov;
553         struct lov_oinfo *loi;
554         struct lov_file_handles *lfh = NULL;
555         int rc = 0, i;
556         ENTRY;
557
558         if (!lsm) {
559                 CERROR("LOV requires striping ea for destruction\n");
560                 RETURN(-EINVAL);
561         }
562
563         if (lsm->lsm_magic != LOV_MAGIC) {
564                 CERROR("LOV striping magic bad %#x != %#x\n",
565                        lsm->lsm_magic, LOV_MAGIC);
566                 RETURN(-EINVAL);
567         }
568
569         if (!export || !export->exp_obd)
570                 RETURN(-ENODEV);
571
572         if (oa->o_valid & OBD_MD_FLHANDLE)
573                 lfh = lov_handle2lfh(obdo_handle(oa));
574
575         lov = &export->exp_obd->u.lov;
576         for (i = 0,loi = lsm->lsm_oinfo; i < lsm->lsm_stripe_count; i++,loi++) {
577                 int err;
578                 if (lov->tgts[loi->loi_ost_idx].active == 0) {
579                         CDEBUG(D_HA, "lov idx %d inactive\n", loi->loi_ost_idx);
580                         /* Orphan clean up will (someday) fix this up. */
581                         continue;
582                 }
583
584                 memcpy(&tmp, oa, sizeof(tmp));
585                 tmp.o_id = loi->loi_id;
586                 if (lfh)
587                         memcpy(obdo_handle(&tmp), &lfh->lfh_handles[i],
588                                sizeof(lfh->lfh_handles[i]));
589                 else
590                         tmp.o_valid &= ~OBD_MD_FLHANDLE;
591                 err = obd_destroy(&lov->tgts[loi->loi_ost_idx].conn, &tmp,
592                                   NULL);
593                 if (err && lov->tgts[loi->loi_ost_idx].active) {
594                         CERROR("Error destroying objid "LPX64" subobj "
595                                LPX64" on OST idx %d\n: rc = %d",
596                                oa->o_id, loi->loi_id, loi->loi_ost_idx, err);
597                         if (!rc)
598                                 rc = err;
599                 }
600         }
601         RETURN(rc);
602 }
603
604 /* compute object size given "stripeno" and the ost size */
605 static obd_size lov_stripe_size(struct lov_stripe_md *lsm, obd_size ost_size,
606                                 int stripeno)
607 {
608         unsigned long ssize  = lsm->lsm_stripe_size;
609         unsigned long swidth = ssize * lsm->lsm_stripe_count;
610         unsigned long stripe_size;
611         obd_size lov_size;
612
613         if (ost_size == 0)
614                 return 0;
615
616         /* do_div(a, b) returns a % b, and a = a / b */
617         stripe_size = do_div(ost_size, ssize);
618
619         if (stripe_size)
620                 lov_size = ost_size * swidth + stripeno * ssize + stripe_size;
621         else
622                 lov_size = (ost_size - 1) * swidth + (stripeno + 1) * ssize;
623
624         return lov_size;
625 }
626
627 static void lov_merge_attrs(struct obdo *tgt, struct obdo *src, obd_flag valid,
628                             struct lov_stripe_md *lsm, int stripeno, int *new)
629 {
630         if (*new) {
631                 obdo_cpy_md(tgt, src, valid);
632                 if (valid & OBD_MD_FLSIZE)
633                         tgt->o_size = lov_stripe_size(lsm,src->o_size,stripeno);
634                 *new = 0;
635         } else {
636                 if (valid & OBD_MD_FLSIZE) {
637                         /* this handles sparse files properly */
638                         obd_size lov_size;
639
640                         lov_size = lov_stripe_size(lsm, src->o_size, stripeno);
641                         if (lov_size > tgt->o_size)
642                                 tgt->o_size = lov_size;
643                 }
644                 if (valid & OBD_MD_FLBLOCKS)
645                         tgt->o_blocks += src->o_blocks;
646                 if (valid & OBD_MD_FLCTIME && tgt->o_ctime < src->o_ctime)
647                         tgt->o_ctime = src->o_ctime;
648                 if (valid & OBD_MD_FLMTIME && tgt->o_mtime < src->o_mtime)
649                         tgt->o_mtime = src->o_mtime;
650         }
651 }
652
653 static int lov_getattr(struct lustre_handle *conn, struct obdo *oa,
654                        struct lov_stripe_md *lsm)
655 {
656         struct obdo tmp;
657         struct obd_export *export = class_conn2export(conn);
658         struct lov_obd *lov;
659         struct lov_oinfo *loi;
660         struct lov_file_handles *lfh = NULL;
661         int i;
662         int new = 1;
663         ENTRY;
664
665         if (!lsm) {
666                 CERROR("LOV requires striping ea\n");
667                 RETURN(-EINVAL);
668         }
669
670         if (lsm->lsm_magic != LOV_MAGIC) {
671                 CERROR("LOV striping magic bad %#x != %#x\n",
672                        lsm->lsm_magic, LOV_MAGIC);
673                 RETURN(-EINVAL);
674         }
675
676         if (!export || !export->exp_obd)
677                 RETURN(-ENODEV);
678
679         lov = &export->exp_obd->u.lov;
680
681         if (oa->o_valid & OBD_MD_FLHANDLE)
682                 lfh = lov_handle2lfh(obdo_handle(oa));
683
684         CDEBUG(D_INFO, "objid "LPX64": %ux%u byte stripes\n",
685                lsm->lsm_object_id, lsm->lsm_stripe_count, lsm->lsm_stripe_size);
686         for (i = 0,loi = lsm->lsm_oinfo; i < lsm->lsm_stripe_count; i++,loi++) {
687                 int err;
688
689                 if (lov->tgts[loi->loi_ost_idx].active == 0) {
690                         CDEBUG(D_HA, "lov idx %d inactive\n", loi->loi_ost_idx);
691                         continue;
692                 }
693
694                 CDEBUG(D_INFO, "objid "LPX64"[%d] has subobj "LPX64" at idx "
695                        "%u\n", oa->o_id, i, loi->loi_id, loi->loi_ost_idx);
696                 /* create data objects with "parent" OA */
697                 memcpy(&tmp, oa, sizeof(tmp));
698                 tmp.o_id = loi->loi_id;
699                 if (lfh)
700                         memcpy(obdo_handle(&tmp), &lfh->lfh_handles[i],
701                                sizeof(lfh->lfh_handles[i]));
702                 else
703                         tmp.o_valid &= ~OBD_MD_FLHANDLE;
704
705                 err = obd_getattr(&lov->tgts[loi->loi_ost_idx].conn, &tmp,NULL);
706                 if (err) {
707                         if (lov->tgts[loi->loi_ost_idx].active) {
708                                 CERROR("Error getattr objid "LPX64" subobj "
709                                        LPX64" on OST idx %d: rc = %d\n",
710                                        oa->o_id, loi->loi_id, loi->loi_ost_idx,
711                                        err);
712                                 RETURN(err);
713                         }
714                 } else {
715                         lov_merge_attrs(oa, &tmp, tmp.o_valid, lsm, i, &new);
716                 }
717         }
718
719         RETURN(0);
720 }
721
722 static int lov_setattr(struct lustre_handle *conn, struct obdo *oa,
723                        struct lov_stripe_md *lsm)
724 {
725         struct obdo *tmp;
726         struct obd_export *export = class_conn2export(conn);
727         struct lov_obd *lov;
728         struct lov_oinfo *loi;
729         struct lov_file_handles *lfh = NULL;
730         int rc = 0, i;
731         ENTRY;
732
733         /* Note that this code is currently unused, hence LBUG(), just
734          * to know when/if it is ever revived that it needs cleanups.
735          */
736         LBUG();
737
738         if (!lsm) {
739                 CERROR("LOV requires striping ea\n");
740                 RETURN(-EINVAL);
741         }
742
743         if (lsm->lsm_magic != LOV_MAGIC) {
744                 CERROR("LOV striping magic bad %#x != %#x\n",
745                        lsm->lsm_magic, LOV_MAGIC);
746                 RETURN(-EINVAL);
747         }
748
749         if (!export || !export->exp_obd)
750                 RETURN(-ENODEV);
751
752         /* size changes should go through punch and not setattr */
753         LASSERT(!(oa->o_valid & OBD_MD_FLSIZE));
754
755         tmp = obdo_alloc();
756         if (!tmp)
757                 RETURN(-ENOMEM);
758
759         if (oa->o_valid & OBD_MD_FLHANDLE)
760                 lfh = lov_handle2lfh(obdo_handle(oa));
761
762         lov = &export->exp_obd->u.lov;
763         for (i = 0,loi = lsm->lsm_oinfo; i < lsm->lsm_stripe_count; i++,loi++) {
764                 int err;
765
766                 obdo_cpy_md(tmp, oa, oa->o_valid);
767
768                 if (lfh)
769                         memcpy(obdo_handle(tmp), &lfh->lfh_handles[i],
770                                 sizeof(lfh->lfh_handles[i]));
771                 else
772                         tmp->o_valid &= ~OBD_MD_FLHANDLE;
773
774                 tmp->o_id = loi->loi_id;
775
776                 err = obd_setattr(&lov->tgts[loi->loi_ost_idx].conn, tmp, NULL);
777                 if (err) {
778                         CERROR("Error setattr objid "LPX64" subobj "LPX64
779                                " on OST idx %d: rc = %d\n",
780                                oa->o_id, loi->loi_id, loi->loi_ost_idx, err);
781                         if (!rc)
782                                 rc = err;
783                 }
784         }
785         obdo_free(tmp);
786         RETURN(rc);
787 }
788
789 static int lov_open(struct lustre_handle *conn, struct obdo *oa,
790                     struct lov_stripe_md *lsm)
791 {
792         struct obdo *tmp; /* on the heap here, on the stack in lov_close? */
793         struct obd_export *export = class_conn2export(conn);
794         struct lov_obd *lov;
795         struct lov_oinfo *loi;
796         struct lov_file_handles *lfh = NULL;
797         struct lustre_handle *handle;
798         int new = 1;
799         int rc = 0, i;
800         ENTRY;
801
802         if (!lsm) {
803                 CERROR("LOV requires striping ea for opening\n");
804                 RETURN(-EINVAL);
805         }
806
807         if (lsm->lsm_magic != LOV_MAGIC) {
808                 CERROR("LOV striping magic bad %#x != %#x\n",
809                        lsm->lsm_magic, LOV_MAGIC);
810                 RETURN(-EINVAL);
811         }
812
813         if (!export || !export->exp_obd)
814                 RETURN(-ENODEV);
815
816         tmp = obdo_alloc();
817         if (!tmp)
818                 RETURN(-ENOMEM);
819
820         lfh = kmem_cache_alloc(lov_file_cache, GFP_KERNEL);
821         if (!lfh)
822                 GOTO(out_tmp, rc = -ENOMEM);
823         OBD_ALLOC(lfh->lfh_handles,
824                   lsm->lsm_stripe_count * sizeof(*lfh->lfh_handles));
825         if (!lfh->lfh_handles)
826                 GOTO(out_lfh, rc = -ENOMEM);
827
828         lov = &export->exp_obd->u.lov;
829         oa->o_size = 0;
830         oa->o_blocks = 0;
831         for (i = 0,loi = lsm->lsm_oinfo; i < lsm->lsm_stripe_count; i++,loi++) {
832
833                 if (lov->tgts[loi->loi_ost_idx].active == 0) {
834                         CDEBUG(D_HA, "lov idx %d inactive\n", loi->loi_ost_idx);
835                         continue;
836                 }
837
838                 /* create data objects with "parent" OA */
839                 memcpy(tmp, oa, sizeof(*tmp));
840                 tmp->o_id = loi->loi_id;
841
842                 rc = obd_open(&lov->tgts[loi->loi_ost_idx].conn, tmp, NULL);
843                 if (rc) {
844                         if (lov->tgts[loi->loi_ost_idx].active) {
845                                 CERROR("Error open objid "LPX64" subobj "LPX64
846                                        " on OST idx %d: rc = %d\n",
847                                        oa->o_id, lsm->lsm_oinfo[i].loi_id,
848                                        loi->loi_ost_idx, rc);
849                                 goto out_handles;
850                         }
851                         continue;
852                 }
853
854                 lov_merge_attrs(oa, tmp, tmp->o_valid, lsm, i, &new);
855
856                 if (tmp->o_valid & OBD_MD_FLHANDLE)
857                         memcpy(&lfh->lfh_handles[i], obdo_handle(tmp),
858                                sizeof(lfh->lfh_handles[i]));
859         }
860
861         handle = obdo_handle(oa);
862         
863         lfh->lfh_count = lsm->lsm_stripe_count;
864         get_random_bytes(&lfh->lfh_cookie, sizeof(lfh->lfh_cookie));
865         
866         handle->addr = (__u64)(unsigned long)lfh;
867         handle->cookie = lfh->lfh_cookie;
868         oa->o_valid |= OBD_MD_FLHANDLE;
869         spin_lock(&export->exp_lov_data.led_lock);
870         list_add(&lfh->lfh_list, &export->exp_lov_data.led_open_head);
871         spin_unlock(&export->exp_lov_data.led_lock);
872
873 out_tmp:
874         obdo_free(tmp);
875         RETURN(rc);
876
877 out_handles:
878         for (i--, loi = &lsm->lsm_oinfo[i]; i >= 0; i--, loi--) {
879                 int err;
880
881                 if (lov->tgts[loi->loi_ost_idx].active == 0)
882                         continue;
883
884                 memcpy(tmp, oa, sizeof(*tmp));
885                 tmp->o_id = loi->loi_id;
886                 memcpy(obdo_handle(tmp), &lfh->lfh_handles[i],
887                        sizeof(lfh->lfh_handles[i]));
888
889                 err = obd_close(&lov->tgts[loi->loi_ost_idx].conn, tmp, NULL);
890                 if (err) {
891                         CERROR("Error closing objid "LPX64" subobj "LPX64
892                                " on OST idx %d after open error: rc = %d\n",
893                                oa->o_id, loi->loi_id, loi->loi_ost_idx, err);
894                 }
895         }
896        
897         OBD_FREE(lfh->lfh_handles,
898                  lsm->lsm_stripe_count * sizeof(*lfh->lfh_handles));
899 out_lfh:
900         lfh->lfh_cookie = DEAD_HANDLE_MAGIC;
901         kmem_cache_free(lov_file_cache, lfh);
902         goto out_tmp;
903 }
904
905 static int lov_close(struct lustre_handle *conn, struct obdo *oa,
906                      struct lov_stripe_md *lsm)
907 {
908         struct obdo tmp;
909         struct obd_export *export = class_conn2export(conn);
910         struct lov_obd *lov;
911         struct lov_oinfo *loi;
912         struct lov_file_handles *lfh = NULL;
913         int rc = 0, i;
914         ENTRY;
915
916         if (!lsm) {
917                 CERROR("LOV requires striping ea\n");
918                 RETURN(-EINVAL);
919         }
920
921         if (lsm->lsm_magic != LOV_MAGIC) {
922                 CERROR("LOV striping magic bad %#x != %#x\n",
923                        lsm->lsm_magic, LOV_MAGIC);
924                 RETURN(-EINVAL);
925         }
926
927         if (!export || !export->exp_obd)
928                 RETURN(-ENODEV);
929
930         if (oa->o_valid & OBD_MD_FLHANDLE)
931                 lfh = lov_handle2lfh(obdo_handle(oa));
932
933         lov = &export->exp_obd->u.lov;
934         for (i = 0,loi = lsm->lsm_oinfo; i < lsm->lsm_stripe_count; i++,loi++) {
935                 int err;
936
937                 if (lov->tgts[loi->loi_ost_idx].active == 0) {
938                         CDEBUG(D_HA, "lov idx %d inactive\n", loi->loi_ost_idx);
939                         continue;
940                 }
941
942                 /* create data objects with "parent" OA */
943                 memcpy(&tmp, oa, sizeof(tmp));
944                 tmp.o_id = loi->loi_id;
945                 if (lfh)
946                         memcpy(obdo_handle(&tmp), &lfh->lfh_handles[i],
947                                sizeof(lfh->lfh_handles[i]));
948                 else
949                         tmp.o_valid &= ~OBD_MD_FLHANDLE;
950
951                 err = obd_close(&lov->tgts[loi->loi_ost_idx].conn, &tmp, NULL);
952                 if (err) {
953                         CERROR("Error close objid "LPX64" subobj "LPX64
954                                " on OST idx %d: rc = %d\n",
955                                oa->o_id, loi->loi_id, loi->loi_ost_idx, err);
956                         if (!rc)
957                                 rc = err;
958                 }
959         }
960         if (lfh) {
961                 list_del(&lfh->lfh_list);
962                 OBD_FREE(lfh->lfh_handles,
963                          lsm->lsm_stripe_count * sizeof(*lfh->lfh_handles));
964                 lfh->lfh_cookie = DEAD_HANDLE_MAGIC;
965                 kmem_cache_free(lov_file_cache, lfh);
966         }
967
968         RETURN(rc);
969 }
970
971 #ifndef log2
972 #define log2(n) ffz(~(n))
973 #endif
974
975 #warning FIXME: merge these two functions now that they are nearly the same
976
977 /* compute ost offset in stripe "stripeno" corresponding to offset "lov_off" */
978 static obd_off lov_stripe_offset(struct lov_stripe_md *lsm, obd_off lov_off,
979                                  int stripeno)
980 {
981         unsigned long ssize  = lsm->lsm_stripe_size;
982         unsigned long swidth = ssize * lsm->lsm_stripe_count;
983         unsigned long stripe_off, this_stripe;
984
985         if (lov_off == OBD_OBJECT_EOF || lov_off == 0)
986                 return lov_off;
987
988         /* do_div(a, b) returns a % b, and a = a / b */
989         stripe_off = do_div(lov_off, swidth);
990
991         this_stripe = stripeno * ssize;
992         if (stripe_off <= this_stripe)
993                 stripe_off = 0;
994         else {
995                 stripe_off -= this_stripe;
996
997                 if (stripe_off > ssize)
998                         stripe_off = ssize;
999         }
1000
1001
1002         return lov_off * ssize + stripe_off;
1003 }
1004
1005 /* compute which stripe number "lov_off" will be written into */
1006 static int lov_stripe_number(struct lov_stripe_md *lsm, obd_off lov_off)
1007 {
1008         unsigned long ssize  = lsm->lsm_stripe_size;
1009         unsigned long swidth = ssize * lsm->lsm_stripe_count;
1010         unsigned long stripe_off;
1011
1012         stripe_off = do_div(lov_off, swidth);
1013
1014         return stripe_off / ssize;
1015 }
1016
1017
1018 /* FIXME: maybe we'll just make one node the authoritative attribute node, then
1019  * we can send this 'punch' to just the authoritative node and the nodes
1020  * that the punch will affect. */
1021 static int lov_punch(struct lustre_handle *conn, struct obdo *oa,
1022                      struct lov_stripe_md *lsm,
1023                      obd_off start, obd_off end)
1024 {
1025         struct obdo tmp;
1026         struct obd_export *export = class_conn2export(conn);
1027         struct lov_obd *lov;
1028         struct lov_oinfo *loi;
1029         struct lov_file_handles *lfh = NULL;
1030         int rc = 0, i;
1031         ENTRY;
1032
1033         if (!lsm) {
1034                 CERROR("LOV requires striping ea\n");
1035                 RETURN(-EINVAL);
1036         }
1037
1038         if (lsm->lsm_magic != LOV_MAGIC) {
1039                 CERROR("LOV striping magic bad %#x != %#x\n",
1040                        lsm->lsm_magic, LOV_MAGIC);
1041                 RETURN(-EINVAL);
1042         }
1043
1044         if (!export || !export->exp_obd)
1045                 RETURN(-ENODEV);
1046
1047         if (oa->o_valid & OBD_MD_FLHANDLE)
1048                 lfh = lov_handle2lfh(obdo_handle(oa));
1049
1050         lov = &export->exp_obd->u.lov;
1051         for (i = 0,loi = lsm->lsm_oinfo; i < lsm->lsm_stripe_count; i++,loi++) {
1052                 obd_off starti = lov_stripe_offset(lsm, start, i);
1053                 obd_off endi = lov_stripe_offset(lsm, end, i);
1054                 int err;
1055
1056                 if (starti == endi)
1057                         continue;
1058
1059                 /* create data objects with "parent" OA */
1060                 memcpy(&tmp, oa, sizeof(tmp));
1061                 tmp.o_id = loi->loi_id;
1062                 if (lfh)
1063                         memcpy(obdo_handle(&tmp), &lfh->lfh_handles[i],
1064                                sizeof(lfh->lfh_handles[i]));
1065                 else
1066                         tmp.o_valid &= ~OBD_MD_FLHANDLE;
1067
1068                 err = obd_punch(&lov->tgts[loi->loi_ost_idx].conn, &tmp, NULL,
1069                                 starti, endi);
1070                 if (err) {
1071                         CERROR("Error punch objid "LPX64" subobj "LPX64
1072                                " on OST idx %d: rc = %d\n",
1073                                oa->o_id, loi->loi_id, loi->loi_ost_idx, err);
1074                         if (!rc)
1075                                 rc = err;
1076                 }
1077         }
1078         RETURN(rc);
1079 }
1080
1081 static inline int lov_brw(int cmd, struct lustre_handle *conn,
1082                           struct lov_stripe_md *lsm, obd_count oa_bufs,
1083                           struct brw_page *pga, struct obd_brw_set *set)
1084 {
1085         struct {
1086                 int bufct;
1087                 int index;
1088                 int subcount;
1089                 struct lov_stripe_md lsm;
1090                 int ost_idx;
1091         } *stripeinfo, *si, *si_last;
1092         struct obd_export *export = class_conn2export(conn);
1093         struct lov_obd *lov;
1094         struct brw_page *ioarr;
1095         struct lov_oinfo *loi;
1096         int rc = 0, i, *where, stripe_count = lsm->lsm_stripe_count;
1097         ENTRY;
1098
1099         if (!lsm) {
1100                 CERROR("LOV requires striping ea\n");
1101                 RETURN(-EINVAL);
1102         }
1103
1104         if (lsm->lsm_magic != LOV_MAGIC) {
1105                 CERROR("LOV striping magic bad %#x != %#x\n",
1106                        lsm->lsm_magic, LOV_MAGIC);
1107                 RETURN(-EINVAL);
1108         }
1109
1110         lov = &export->exp_obd->u.lov;
1111
1112         OBD_ALLOC(stripeinfo, stripe_count * sizeof(*stripeinfo));
1113         if (!stripeinfo)
1114                 GOTO(out_cbdata, rc = -ENOMEM);
1115
1116         OBD_ALLOC(where, sizeof(*where) * oa_bufs);
1117         if (!where)
1118                 GOTO(out_sinfo, rc = -ENOMEM);
1119
1120         OBD_ALLOC(ioarr, sizeof(*ioarr) * oa_bufs);
1121         if (!ioarr)
1122                 GOTO(out_where, rc = -ENOMEM);
1123
1124         for (i = 0; i < oa_bufs; i++) {
1125                 where[i] = lov_stripe_number(lsm, pga[i].off);
1126                 stripeinfo[where[i]].bufct++;
1127         }
1128
1129         for (i = 0, loi = lsm->lsm_oinfo, si_last = si = stripeinfo;
1130              i < stripe_count; i++, loi++, si_last = si, si++) {
1131                 if (i > 0)
1132                         si->index = si_last->index + si_last->bufct;
1133                 si->lsm.lsm_object_id = loi->loi_id;
1134                 si->ost_idx = loi->loi_ost_idx;
1135         }
1136
1137         for (i = 0; i < oa_bufs; i++) {
1138                 int which = where[i];
1139                 int shift;
1140
1141                 shift = stripeinfo[which].index + stripeinfo[which].subcount;
1142                 LASSERT(shift < oa_bufs);
1143                 ioarr[shift] = pga[i];
1144                 ioarr[shift].off = lov_stripe_offset(lsm, pga[i].off, which);
1145                 stripeinfo[which].subcount++;
1146         }
1147
1148         for (i = 0, si = stripeinfo; i < stripe_count; i++, si++) {
1149                 int shift = si->index;
1150
1151                 if (si->bufct) {
1152                         LASSERT(shift < oa_bufs);
1153                         rc = obd_brw(cmd, &lov->tgts[si->ost_idx].conn,
1154                                      &si->lsm, si->bufct, &ioarr[shift], set);
1155                         if (rc)
1156                                 GOTO(out_ioarr, rc);
1157                 }
1158         }
1159
1160  out_ioarr:
1161         OBD_FREE(ioarr, sizeof(*ioarr) * oa_bufs);
1162  out_where:
1163         OBD_FREE(where, sizeof(*where) * oa_bufs);
1164  out_sinfo:
1165         OBD_FREE(stripeinfo, stripe_count * sizeof(*stripeinfo));
1166  out_cbdata:
1167         RETURN(rc);
1168 }
1169
1170 static struct lov_lock_handles *lov_newlockh(struct lov_stripe_md *lsm)
1171 {
1172         struct lov_lock_handles *lov_lockh;
1173
1174         OBD_ALLOC(lov_lockh, sizeof(*lov_lockh) +
1175                   sizeof(*lov_lockh->llh_handles) * lsm->lsm_stripe_count);
1176         if (!lov_lockh)
1177                 return NULL;
1178
1179         get_random_bytes(&lov_lockh->llh_cookie, sizeof(lov_lockh->llh_cookie));
1180
1181         return lov_lockh;
1182 }
1183
1184 /* We are only ever passed local lock handles here, so we do not need to
1185  * validate (and we can't really because these structs are variable sized
1186  * and therefore alloced, and not from a private slab).
1187  *
1188  * We just check because we can...
1189  */
1190 static struct lov_lock_handles *lov_h2lovlockh(struct lustre_handle *handle)
1191 {
1192         struct lov_lock_handles *lov_lockh = NULL;
1193
1194         if (!handle || !handle->addr)
1195                 RETURN(NULL);
1196
1197         lov_lockh = (struct lov_lock_handles *)(unsigned long)(handle->addr);
1198         if (lov_lockh->llh_cookie != handle->cookie)
1199                 RETURN(NULL);
1200
1201         return lov_lockh;
1202 }
1203
1204 static int lov_enqueue(struct lustre_handle *conn, struct lov_stripe_md *lsm,
1205                        struct lustre_handle *parent_lock,
1206                        __u32 type, void *cookie, int cookielen, __u32 mode,
1207                        int *flags, void *cb, void *data, int datalen,
1208                        struct lustre_handle *lockh)
1209 {
1210         struct obd_export *export = class_conn2export(conn);
1211         struct lov_lock_handles *lov_lockh = NULL;
1212         struct lustre_handle *lov_lockhp;
1213         struct lov_obd *lov;
1214         struct lov_oinfo *loi;
1215         struct lov_stripe_md submd;
1216         int rc = 0, i;
1217         ENTRY;
1218
1219         if (!lsm) {
1220                 CERROR("LOV requires striping ea\n");
1221                 RETURN(-EINVAL);
1222         }
1223
1224         if (lsm->lsm_magic != LOV_MAGIC) {
1225                 CERROR("LOV striping magic bad %#x != %#x\n",
1226                        lsm->lsm_magic, LOV_MAGIC);
1227                 RETURN(-EINVAL);
1228         }
1229
1230         /* we should never be asked to replay a lock. */
1231
1232         LASSERT((*flags & LDLM_FL_REPLAY) == 0);
1233
1234         if (!export || !export->exp_obd)
1235                 RETURN(-ENODEV);
1236
1237         if (lsm->lsm_stripe_count > 1) {
1238                 lov_lockh = lov_newlockh(lsm);
1239                 if (!lov_lockh)
1240                         RETURN(-ENOMEM);
1241
1242                 lockh->addr = (__u64)(unsigned long)lov_lockh;
1243                 lockh->cookie = lov_lockh->llh_cookie;
1244                 lov_lockhp = lov_lockh->llh_handles;
1245         } else
1246                 lov_lockhp = lockh;
1247
1248         lov = &export->exp_obd->u.lov;
1249         for (i = 0, loi = lsm->lsm_oinfo; i < lsm->lsm_stripe_count;
1250              i++, loi++, lov_lockhp++) {
1251                 struct ldlm_extent *extent = (struct ldlm_extent *)cookie;
1252                 struct ldlm_extent sub_ext;
1253
1254                 if (lov->tgts[loi->loi_ost_idx].active == 0) {
1255                         CDEBUG(D_HA, "lov idx %d inactive\n", loi->loi_ost_idx);
1256                         continue;
1257                 }
1258
1259                 *flags = 0;
1260                 sub_ext.start = lov_stripe_offset(lsm, extent->start, i);
1261                 sub_ext.end = lov_stripe_offset(lsm, extent->end, i);
1262                 if (sub_ext.start == sub_ext.end /* || !active */)
1263                         continue;
1264
1265                 /* XXX LOV STACKING: submd should be from the subobj */
1266                 submd.lsm_object_id = loi->loi_id;
1267                 submd.lsm_stripe_count = 0;
1268                 /* XXX submd is not fully initialized here */
1269                 *flags = 0;
1270                 rc = obd_enqueue(&(lov->tgts[loi->loi_ost_idx].conn), &submd,
1271                                  parent_lock, type, &sub_ext, sizeof(sub_ext),
1272                                  mode, flags, cb, data, datalen, lov_lockhp);
1273                 // XXX add a lock debug statement here
1274                 if (rc)
1275                         memset(lov_lockhp, 0, sizeof(*lov_lockhp));
1276                 if (rc && lov->tgts[loi->loi_ost_idx].active) {
1277                         CERROR("Error enqueue objid "LPX64" subobj "LPX64
1278                                " on OST idx %d: rc = %d\n", lsm->lsm_object_id,
1279                                loi->loi_id, loi->loi_ost_idx, rc);
1280                         goto out_locks;
1281                 }
1282         }
1283         RETURN(0);
1284
1285 out_locks:
1286         while (loi--, lov_lockhp--, i-- > 0) {
1287                 struct lov_stripe_md submd;
1288                 int err;
1289
1290                 if (lov_lockhp->addr == 0 ||
1291                     lov->tgts[loi->loi_ost_idx].active == 0)
1292                         continue;
1293
1294                 /* XXX LOV STACKING: submd should be from the subobj */
1295                 submd.lsm_object_id = loi->loi_id;
1296                 submd.lsm_stripe_count = 0;
1297                 err = obd_cancel(&lov->tgts[loi->loi_ost_idx].conn, &submd,
1298                                  mode, lov_lockhp);
1299                 if (err) {
1300                         CERROR("Error cancelling objid "LPX64
1301                                " on OST idx %d after enqueue error: rc = %d\n",
1302                                loi->loi_id, loi->loi_ost_idx, err);
1303                 }
1304         }
1305
1306         if (lsm->lsm_stripe_count > 1) {
1307                 lov_lockh->llh_cookie = DEAD_HANDLE_MAGIC;
1308                 OBD_FREE(lov_lockh, sizeof(*lov_lockh) +
1309                           sizeof(*lov_lockh->llh_handles) *
1310                           lsm->lsm_stripe_count);
1311         }
1312         lockh->addr = 0;
1313         lockh->cookie = DEAD_HANDLE_MAGIC;
1314
1315         RETURN(rc);
1316 }
1317
1318 static int lov_cancel(struct lustre_handle *conn, struct lov_stripe_md *lsm,
1319                       __u32 mode, struct lustre_handle *lockh)
1320 {
1321         struct obd_export *export = class_conn2export(conn);
1322         struct lov_lock_handles *lov_lockh = NULL;
1323         struct lustre_handle *lov_lockhp;
1324         struct lov_obd *lov;
1325         struct lov_oinfo *loi;
1326         int rc = 0, i;
1327         ENTRY;
1328
1329         if (!lsm) {
1330                 CERROR("LOV requires striping ea\n");
1331                 RETURN(-EINVAL);
1332         }
1333
1334         if (lsm->lsm_magic != LOV_MAGIC) {
1335                 CERROR("LOV striping magic bad %#x != %#x\n",
1336                        lsm->lsm_magic, LOV_MAGIC);
1337                 RETURN(-EINVAL);
1338         }
1339
1340         if (!export || !export->exp_obd)
1341                 RETURN(-ENODEV);
1342
1343         LASSERT(lockh);
1344         if (lsm->lsm_stripe_count > 1) {
1345                 lov_lockh = lov_h2lovlockh(lockh);
1346                 if (!lov_lockh) {
1347                         CERROR("LOV: invalid lov lock handle %p\n", lockh);
1348                         RETURN(-EINVAL);
1349                 }
1350
1351                 lov_lockhp = lov_lockh->llh_handles;
1352         } else
1353                 lov_lockhp = lockh;
1354
1355         lov = &export->exp_obd->u.lov;
1356         for (i = 0, loi = lsm->lsm_oinfo; i < lsm->lsm_stripe_count;
1357              i++, loi++, lov_lockhp++ ) {
1358                 struct lov_stripe_md submd;
1359                 int err;
1360
1361                 if (lov_lockhp->addr == 0) {
1362                         CDEBUG(D_HA, "lov idx %d no lock?\n", loi->loi_ost_idx);
1363                         continue;
1364                 }
1365
1366                 /* XXX LOV STACKING: submd should be from the subobj */
1367                 submd.lsm_object_id = loi->loi_id;
1368                 submd.lsm_stripe_count = 0;
1369                 err = obd_cancel(&lov->tgts[loi->loi_ost_idx].conn, &submd,
1370                                  mode, lov_lockhp);
1371                 if (err) {
1372                         if (lov->tgts[loi->loi_ost_idx].active) {
1373                                 CERROR("Error cancel objid "LPX64" subobj "
1374                                        LPX64" on OST idx %d: rc = %d\n",
1375                                        lsm->lsm_object_id,
1376                                        loi->loi_id, loi->loi_ost_idx, err);
1377                                 if (!rc)
1378                                         rc = err;
1379                         }
1380                 }
1381         }
1382
1383         if (lsm->lsm_stripe_count > 1) {
1384                 lov_lockh->llh_cookie = DEAD_HANDLE_MAGIC;
1385                 OBD_FREE(lov_lockh, sizeof(*lov_lockh) +
1386                           sizeof(*lov_lockh->llh_handles) *
1387                           lsm->lsm_stripe_count);
1388         }
1389         lockh->addr = 0;
1390         lockh->cookie = DEAD_HANDLE_MAGIC;
1391
1392         RETURN(rc);
1393 }
1394
1395 static int lov_cancel_unused(struct lustre_handle *conn,
1396                              struct lov_stripe_md *lsm, int flags)
1397 {
1398         struct obd_export *export = class_conn2export(conn);
1399         struct lov_obd *lov;
1400         struct lov_oinfo *loi;
1401         int rc = 0, i;
1402         ENTRY;
1403
1404         if (!lsm) {
1405                 CERROR("LOV requires striping ea for lock cancellation\n");
1406                 RETURN(-EINVAL);
1407         }
1408
1409         if (!export || !export->exp_obd)
1410                 RETURN(-ENODEV);
1411
1412         lov = &export->exp_obd->u.lov;
1413         for (i = 0,loi = lsm->lsm_oinfo; i < lsm->lsm_stripe_count; i++,loi++) {
1414                 struct lov_stripe_md submd;
1415                 int err;
1416
1417                 submd.lsm_object_id = loi->loi_id;
1418                 submd.lsm_stripe_count = 0;
1419                 err = obd_cancel_unused(&lov->tgts[loi->loi_ost_idx].conn,
1420                                        &submd, flags);
1421                 if (err && lov->tgts[loi->loi_ost_idx].active) {
1422                         CERROR("Error cancel unused objid "LPX64" subobj "LPX64
1423                                " on OST idx %d: rc = %d\n", lsm->lsm_object_id,
1424                                loi->loi_id, loi->loi_ost_idx, err);
1425                         if (!rc)
1426                                 rc = err;
1427                 }
1428         }
1429
1430         RETURN(rc);
1431 }
1432
1433 static int lov_statfs(struct lustre_handle *conn, struct obd_statfs *osfs)
1434 {
1435         struct obd_export *export = class_conn2export(conn);
1436         struct lov_obd *lov;
1437         struct obd_statfs lov_sfs;
1438         int set = 0;
1439         int rc = 0;
1440         int i;
1441         ENTRY;
1442
1443         if (!export || !export->exp_obd)
1444                 RETURN(-ENODEV);
1445
1446         lov = &export->exp_obd->u.lov;
1447
1448         /* We only get block data from the OBD */
1449         for (i = 0; i < lov->desc.ld_tgt_count; i++) {
1450                 int err;
1451
1452                 if (!lov->tgts[i].active) {
1453                         CDEBUG(D_HA, "lov idx %d inactive\n", i);
1454                         continue;
1455                 }
1456
1457                 err = obd_statfs(&lov->tgts[i].conn, &lov_sfs);
1458                 if (err) {
1459                         CERROR("Error statfs OSC %s i %d: err = %d\n",
1460                                lov->tgts[i].uuid, i, err);
1461                         if (!rc)
1462                                 rc = err;
1463                         continue; /* XXX or break? - probably OK to continue */
1464                 }
1465                 if (!set) {
1466                         memcpy(osfs, &lov_sfs, sizeof(lov_sfs));
1467                         set = 1;
1468                 } else {
1469                         osfs->os_bfree += lov_sfs.os_bfree;
1470                         osfs->os_bavail += lov_sfs.os_bavail;
1471                         osfs->os_blocks += lov_sfs.os_blocks;
1472                         /* XXX not sure about this one - depends on policy.
1473                          *   - could be minimum if we always stripe on all OBDs
1474                          *     (but that would be wrong for any other policy,
1475                          *     if one of the OBDs has no more objects left)
1476                          *   - could be sum if we stripe whole objects
1477                          *   - could be average, just to give a nice number
1478                          *   - we just pick first OST and hope it is enough
1479                         sfs->f_ffree += lov_sfs.f_ffree;
1480                          */
1481                 }
1482         }
1483         RETURN(rc);
1484 }
1485
1486 static int lov_iocontrol(unsigned int cmd, struct lustre_handle *conn, int len,
1487                          void *karg, void *uarg)
1488 {
1489         struct obd_device *obddev = class_conn2obd(conn);
1490         struct lov_obd *lov = &obddev->u.lov;
1491         int i, count = lov->desc.ld_tgt_count;
1492         int rc;
1493
1494         ENTRY;
1495
1496         switch (cmd) {
1497         case IOC_LOV_SET_OSC_ACTIVE: {
1498                 struct obd_ioctl_data *data = karg;
1499                 rc = lov_set_osc_active(lov,data->ioc_inlbuf1,data->ioc_offset);
1500                 break;
1501         }
1502         case OBD_IOC_LOV_GET_CONFIG: {
1503                 struct obd_ioctl_data *data = karg;
1504                 struct lov_tgt_desc *tgtdesc;
1505                 struct lov_desc *desc;
1506                 obd_uuid_t *uuidp;
1507                 char *buf = NULL;
1508
1509                 buf = NULL;
1510                 len = 0;
1511                 if (obd_ioctl_getdata(&buf, &len, (void *)uarg))
1512                         RETURN(-EINVAL);
1513
1514                 data = (struct obd_ioctl_data *)buf;
1515
1516                 if (sizeof(*desc) > data->ioc_inllen1) {
1517                         OBD_FREE(buf, len);
1518                         RETURN(-EINVAL);
1519                 }
1520
1521                 if (sizeof(*uuidp) * count > data->ioc_inllen2) {
1522                         OBD_FREE(buf, len);
1523                         RETURN(-EINVAL);
1524                 }
1525
1526                 desc = (struct lov_desc *)data->ioc_inlbuf1;
1527                 uuidp = (obd_uuid_t *)data->ioc_inlbuf2;
1528                 memcpy(desc, &(lov->desc), sizeof(*desc));
1529
1530                 tgtdesc = lov->tgts;
1531                 for (i = 0; i < count; i++, uuidp++, tgtdesc++)
1532                         memcpy(uuidp, tgtdesc->uuid, sizeof(*uuidp));
1533
1534                 rc = copy_to_user((void *)uarg, buf, len);
1535                 if (rc)
1536                         rc = -EFAULT;
1537                 OBD_FREE(buf, len);
1538                 break;
1539         }
1540         case LL_IOC_LOV_SETSTRIPE:
1541                 rc = lov_setstripe(conn, karg, uarg);
1542                 break;
1543         case LL_IOC_LOV_GETSTRIPE:
1544                 rc = lov_getstripe(conn, karg, uarg);
1545                 break;
1546         default:
1547                 if (count == 0)
1548                         RETURN(-ENOTTY);
1549                 rc = 0;
1550                 for (i = 0; i < count; i++) {
1551                         int err;
1552
1553                         err = obd_iocontrol(cmd, &lov->tgts[i].conn,
1554                                             len, karg, uarg);
1555                         if (err && !rc)
1556                                 rc = err;
1557                 }
1558         }
1559
1560         RETURN(rc);
1561 }
1562
1563 struct obd_ops lov_obd_ops = {
1564         o_owner:       THIS_MODULE,
1565         o_attach:      lov_attach,
1566         o_detach:      lov_detach,
1567         o_setup:       lov_setup,
1568         o_connect:     lov_connect,
1569         o_disconnect:  lov_disconnect,
1570         o_statfs:      lov_statfs,
1571         o_packmd:      lov_packmd,
1572         o_unpackmd:    lov_unpackmd,
1573         o_create:      lov_create,
1574         o_destroy:     lov_destroy,
1575         o_getattr:     lov_getattr,
1576         o_setattr:     lov_setattr,
1577         o_open:        lov_open,
1578         o_close:       lov_close,
1579         o_brw:         lov_brw,
1580         o_punch:       lov_punch,
1581         o_enqueue:     lov_enqueue,
1582         o_cancel:      lov_cancel,
1583         o_cancel_unused: lov_cancel_unused,
1584         o_iocontrol:   lov_iocontrol
1585 };
1586
1587
1588 #define LOV_VERSION "v0.1"
1589
1590 static int __init lov_init(void)
1591 {
1592         int rc;
1593         printk(KERN_INFO "Lustre Logical Object Volume driver " LOV_VERSION
1594                ", info@clusterfs.com\n");
1595         lov_file_cache = kmem_cache_create("ll_lov_file_data",
1596                                            sizeof(struct lov_file_handles),
1597                                            0, 0, NULL, NULL);
1598         if (!lov_file_cache)
1599                 RETURN(-ENOMEM);
1600
1601         rc = class_register_type(&lov_obd_ops, status_class_var,
1602                                  OBD_LOV_DEVICENAME);
1603         RETURN(rc);
1604 }
1605
1606 static void __exit lov_exit(void)
1607 {
1608         if (kmem_cache_destroy(lov_file_cache))
1609                 CERROR("couldn't free LOV open cache\n");
1610         class_unregister_type(OBD_LOV_DEVICENAME);
1611 }
1612
1613 MODULE_AUTHOR("Cluster File Systems, Inc. <info@clusterfs.com>");
1614 MODULE_DESCRIPTION("Lustre Logical Object Volume OBD driver " LOV_VERSION);
1615 MODULE_LICENSE("GPL");
1616
1617 module_init(lov_init);
1618 module_exit(lov_exit);