1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2 * vim:expandtab:shiftwidth=8:tabstop=8:
4 * Copyright (C) 2002, 2003 Cluster File Systems, Inc.
5 * Author: Phil Schwan <phil@clusterfs.com>
6 * Peter Braam <braam@clusterfs.com>
7 * Mike Shaver <shaver@clusterfs.com>
9 * This file is part of Lustre, http://www.lustre.org.
11 * Lustre is free software; you can redistribute it and/or
12 * modify it under the terms of version 2 of the GNU General Public
13 * License as published by the Free Software Foundation.
15 * Lustre is distributed in the hope that it will be useful,
16 * but WITHOUT ANY WARRANTY; without even the implied warranty of
17 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
18 * GNU General Public License for more details.
20 * You should have received a copy of the GNU General Public License
21 * along with Lustre; if not, write to the Free Software
22 * Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA.
26 #define DEBUG_SUBSYSTEM S_LOV
28 #include <linux/slab.h>
29 #include <linux/module.h>
30 #include <linux/init.h>
31 #include <linux/random.h>
32 #include <linux/slab.h>
33 #include <linux/pagemap.h>
34 #include <asm/div64.h>
36 #include <liblustre.h>
39 #include <linux/obd_support.h>
40 #include <linux/lustre_lib.h>
41 #include <linux/lustre_net.h>
42 #include <linux/lustre_idl.h>
43 #include <linux/lustre_lite.h> /* for LL_IOC_LOV_[GS]ETSTRIPE */
44 #include <linux/lustre_mds.h>
45 #include <linux/obd_class.h>
46 #include <linux/obd_lov.h>
47 #include <linux/seq_file.h>
48 #include <linux/lprocfs_status.h>
50 struct lov_file_handles {
51 struct portals_handle lfh_handle;
52 atomic_t lfh_refcount;
53 struct list_head lfh_list;
55 struct obd_client_handle *lfh_och;
58 struct lov_lock_handles {
59 struct portals_handle llh_handle;
60 atomic_t llh_refcount;
62 struct lustre_handle llh_handles[0];
65 /* lov_file_handles helpers */
66 static void lov_lfh_addref(void *lfhp)
68 struct lov_file_handles *lfh = lfhp;
70 atomic_inc(&lfh->lfh_refcount);
71 CDEBUG(D_INFO, "GETting lfh %p : new refcount %d\n", lfh,
72 atomic_read(&lfh->lfh_refcount));
75 static struct lov_file_handles *lov_lfh_new(void)
77 struct lov_file_handles *lfh;
79 OBD_ALLOC(lfh, sizeof *lfh);
81 CERROR("out of memory\n");
85 atomic_set(&lfh->lfh_refcount, 2);
87 INIT_LIST_HEAD(&lfh->lfh_handle.h_link);
88 class_handle_hash(&lfh->lfh_handle, lov_lfh_addref);
93 static struct lov_file_handles *lov_handle2lfh(struct lustre_handle *handle)
96 LASSERT(handle != NULL);
97 RETURN(class_handle2object(handle->cookie));
100 static void lov_lfh_put(struct lov_file_handles *lfh)
102 CDEBUG(D_INFO, "PUTting lfh %p : new refcount %d\n", lfh,
103 atomic_read(&lfh->lfh_refcount) - 1);
104 LASSERT(atomic_read(&lfh->lfh_refcount) > 0 &&
105 atomic_read(&lfh->lfh_refcount) < 0x5a5a);
106 if (atomic_dec_and_test(&lfh->lfh_refcount)) {
107 LASSERT(list_empty(&lfh->lfh_handle.h_link));
108 OBD_FREE(lfh, sizeof *lfh);
112 static void lov_lfh_destroy(struct lov_file_handles *lfh)
114 class_handle_unhash(&lfh->lfh_handle);
118 static void lov_llh_addref(void *llhp)
120 struct lov_lock_handles *llh = llhp;
122 atomic_inc(&llh->llh_refcount);
123 CDEBUG(D_INFO, "GETting llh %p : new refcount %d\n", llh,
124 atomic_read(&llh->llh_refcount));
127 static struct lov_lock_handles *lov_llh_new(struct lov_stripe_md *lsm)
129 struct lov_lock_handles *llh;
131 OBD_ALLOC(llh, sizeof *llh +
132 sizeof(*llh->llh_handles) * lsm->lsm_stripe_count);
134 CERROR("out of memory\n");
137 atomic_set(&llh->llh_refcount, 2);
138 llh->llh_stripe_count = lsm->lsm_stripe_count;
139 INIT_LIST_HEAD(&llh->llh_handle.h_link);
140 class_handle_hash(&llh->llh_handle, lov_llh_addref);
144 static struct lov_lock_handles *lov_handle2llh(struct lustre_handle *handle)
147 LASSERT(handle != NULL);
148 RETURN(class_handle2object(handle->cookie));
151 static void lov_llh_put(struct lov_lock_handles *llh)
153 CDEBUG(D_INFO, "PUTting llh %p : new refcount %d\n", llh,
154 atomic_read(&llh->llh_refcount) - 1);
155 LASSERT(atomic_read(&llh->llh_refcount) > 0 &&
156 atomic_read(&llh->llh_refcount) < 0x5a5a);
157 if (atomic_dec_and_test(&llh->llh_refcount)) {
158 LASSERT(list_empty(&llh->llh_handle.h_link));
159 OBD_FREE(llh, sizeof *llh +
160 sizeof(*llh->llh_handles) * llh->llh_stripe_count);
164 static void lov_llh_destroy(struct lov_lock_handles *llh)
166 class_handle_unhash(&llh->llh_handle);
171 int lov_attach(struct obd_device *dev, obd_count len, void *data)
173 struct lprocfs_static_vars lvars;
174 struct proc_dir_entry *entry;
177 lprocfs_init_vars(&lvars);
178 rc = lprocfs_obd_attach(dev, lvars.obd_vars);
182 entry = create_proc_entry("target_obd", 0444, dev->obd_proc_entry);
185 entry->proc_fops = &ll_proc_target_fops;
192 int lov_detach(struct obd_device *dev)
194 return lprocfs_obd_detach(dev);
197 static int lov_connect(struct lustre_handle *conn, struct obd_device *obd,
198 struct obd_uuid *cluuid)
200 struct ptlrpc_request *req = NULL;
201 struct lov_obd *lov = &obd->u.lov;
202 struct client_obd *mdc = &lov->mdcobd->u.cli;
203 struct lov_desc *desc = &lov->desc;
204 struct lov_desc *mdesc;
205 struct lov_tgt_desc *tgts;
206 struct obd_export *exp;
207 struct lustre_handle mdc_conn;
208 struct obd_uuid lov_mds_uuid = {"LOV_MDS_UUID"};
209 struct obd_uuid *uuids;
213 rc = class_connect(conn, obd, cluuid);
217 /* We don't want to actually do the underlying connections more than
218 * once, so keep track. */
220 if (lov->refcount > 1)
223 exp = class_conn2export(conn);
224 spin_lock_init(&exp->exp_lov_data.led_lock);
225 INIT_LIST_HEAD(&exp->exp_lov_data.led_open_head);
227 /* retrieve LOV metadata from MDS */
228 rc = obd_connect(&mdc_conn, lov->mdcobd, &lov_mds_uuid);
230 CERROR("cannot connect to mdc: rc = %d\n", rc);
234 rc = mdc_getlovinfo(obd, &mdc_conn, &req);
235 rc2 = obd_disconnect(&mdc_conn, 0);
237 CERROR("cannot get lov info %d\n", rc);
242 CERROR("error disconnecting from MDS %d\n", rc2);
243 GOTO(out_req, rc = rc2);
246 /* mdc_getlovinfo() has checked and swabbed the reply. It has also
247 * done some simple checks (e.g. #uuids consistent with desc, uuid
248 * array fits in LOV_MAX_UUID_BUFFER_SIZE and all uuids are
249 * terminated), but I still need to verify it makes overall
251 mdesc = lustre_msg_buf (req->rq_repmsg, 0, sizeof (*mdesc));
252 LASSERT (mdesc != NULL);
253 LASSERT_REPSWABBED (req, 0);
257 if (!obd_uuid_equals(&obd->obd_uuid, &desc->ld_uuid)) {
258 CERROR("LOV desc: uuid %s not on mds device (%s)\n",
259 obd->obd_uuid.uuid, desc->ld_uuid.uuid);
260 GOTO(out_req, rc = -EINVAL);
263 /* Because of 64-bit divide/mod operations only work with a 32-bit
264 * divisor in a 32-bit kernel, we cannot support a stripe width
265 * of 4GB or larger on 32-bit CPUs.
267 if ((desc->ld_default_stripe_count ?
268 desc->ld_default_stripe_count : desc->ld_tgt_count) *
269 desc->ld_default_stripe_size > ~0UL) {
270 CERROR("LOV: stripe width "LPU64"x%u > %lu on 32-bit system\n",
271 desc->ld_default_stripe_size,
272 desc->ld_default_stripe_count ?
273 desc->ld_default_stripe_count : desc->ld_tgt_count,~0UL);
274 GOTO(out_req, rc = -EINVAL);
277 /* We know ld_tgt_count is reasonable (the array of UUIDS fits in
278 * the maximum buffer size, so we won't be making outrageous
279 * demands on memory here. */
280 lov->bufsize = sizeof(struct lov_tgt_desc) * desc->ld_tgt_count;
281 OBD_ALLOC(lov->tgts, lov->bufsize);
283 CERROR("Out of memory\n");
284 GOTO(out_req, rc = -ENOMEM);
287 uuids = lustre_msg_buf(req->rq_repmsg, 1,
288 sizeof(*uuids) * desc->ld_tgt_count);
289 LASSERT (uuids != NULL);
290 LASSERT_REPSWABBED (req, 1);
292 for (i = 0, tgts = lov->tgts; i < desc->ld_tgt_count; i++, tgts++) {
293 struct obd_uuid *uuid = &tgts->uuid;
294 struct obd_device *tgt_obd;
295 struct obd_uuid lov_osc_uuid = { "LOV_OSC_UUID" };
297 /* NULL termination already checked */
300 tgt_obd = client_tgtuuid2obd(uuid);
303 CERROR("Target %s not attached\n", uuid->uuid);
304 GOTO(out_disc, rc = -EINVAL);
307 if (!tgt_obd->obd_set_up) {
308 CERROR("Target %s not set up\n", uuid->uuid);
309 GOTO(out_disc, rc = -EINVAL);
312 rc = obd_connect(&tgts->conn, tgt_obd, &lov_osc_uuid);
315 CERROR("Target %s connect error %d\n", uuid->uuid, rc);
319 rc = obd_iocontrol(IOC_OSC_REGISTER_LOV, &tgts->conn,
320 sizeof(struct obd_device *), obd, NULL);
322 CERROR("Target %s REGISTER_LOV error %d\n",
324 obd_disconnect(&tgts->conn, 0);
328 desc->ld_active_tgt_count++;
332 mdc->cl_max_mds_easize = obd_size_diskmd(conn, NULL);
333 ptlrpc_req_finished (req);
334 class_export_put(exp);
339 struct obd_uuid uuid;
341 --desc->ld_active_tgt_count;
343 /* save for CERROR below; (we know it's terminated) */
345 rc2 = obd_disconnect(&tgts->conn, 0);
347 CERROR("error: LOV target %s disconnect on OST idx %d: "
348 "rc = %d\n", uuid.uuid, i, rc2);
350 OBD_FREE(lov->tgts, lov->bufsize);
352 ptlrpc_req_finished (req);
354 class_export_put(exp);
355 class_disconnect(conn, 0);
359 static int lov_disconnect(struct lustre_handle *conn, int failover)
361 struct obd_device *obd = class_conn2obd(conn);
362 struct lov_obd *lov = &obd->u.lov;
363 struct obd_export *exp;
364 struct list_head *p, *n;
371 /* Only disconnect the underlying layers on the final disconnect. */
373 if (lov->refcount != 0)
376 for (i = 0; i < lov->desc.ld_tgt_count; i++) {
377 if (obd->obd_no_recov) {
378 /* Pass it on to our clients.
379 * XXX This should be an argument to disconnect,
380 * XXX not a back-door flag on the OBD. Ah well.
382 struct obd_device *osc_obd =
383 class_conn2obd(&lov->tgts[i].conn);
384 osc_obd->obd_no_recov = 1;
386 rc = obd_disconnect(&lov->tgts[i].conn, failover);
388 if (lov->tgts[i].active) {
389 CERROR("Target %s disconnect error %d\n",
390 lov->tgts[i].uuid.uuid, rc);
394 if (lov->tgts[i].active) {
395 lov->desc.ld_active_tgt_count--;
396 lov->tgts[i].active = 0;
399 OBD_FREE(lov->tgts, lov->bufsize);
403 exp = class_conn2export(conn);
405 CERROR("export handle "LPU64" invalid! If you can reproduce, "
406 "please send a full debug log to phik\n", conn->cookie);
409 spin_lock(&exp->exp_lov_data.led_lock);
410 list_for_each_safe(p, n, &exp->exp_lov_data.led_open_head) {
411 /* XXX close these, instead of just discarding them? */
412 struct lov_file_handles *lfh;
413 lfh = list_entry(p, typeof(*lfh), lfh_list);
414 CERROR("discarding open LOV handle %p:"LPX64"\n",
415 lfh, lfh->lfh_handle.h_cookie);
416 list_del(&lfh->lfh_list);
417 OBD_FREE(lfh->lfh_och, lfh->lfh_count * FD_OSTDATA_SIZE);
418 lov_lfh_destroy(lfh);
421 spin_unlock(&exp->exp_lov_data.led_lock);
422 class_export_put(exp);
425 rc = class_disconnect(conn, 0);
431 * -EINVAL : UUID can't be found in the LOV's target list
432 * -ENOTCONN: The UUID is found, but the target connection is bad (!)
433 * -EBADF : The UUID is found, but the OBD is the wrong type (!)
435 static int lov_set_osc_active(struct lov_obd *lov, struct obd_uuid *uuid,
438 struct obd_device *obd;
439 struct lov_tgt_desc *tgt;
443 CDEBUG(D_INFO, "Searching in lov %p for uuid %s (activate=%d)\n",
444 lov, uuid->uuid, activate);
446 spin_lock(&lov->lov_lock);
447 for (i = 0, tgt = lov->tgts; i < lov->desc.ld_tgt_count; i++, tgt++) {
448 CDEBUG(D_INFO, "lov idx %d is %s conn "LPX64"\n",
449 i, tgt->uuid.uuid, tgt->conn.cookie);
450 if (strncmp(uuid->uuid, tgt->uuid.uuid, sizeof uuid->uuid) == 0)
454 if (i == lov->desc.ld_tgt_count)
455 GOTO(out, rc = -EINVAL);
457 obd = class_conn2obd(&tgt->conn);
459 /* This can happen if OST failure races with node shutdown */
460 GOTO(out, rc = -ENOTCONN);
463 CDEBUG(D_INFO, "Found OBD %s=%s device %d (%p) type %s at LOV idx %d\n",
464 obd->obd_name, obd->obd_uuid.uuid, obd->obd_minor, obd,
465 obd->obd_type->typ_name, i);
466 LASSERT(strcmp(obd->obd_type->typ_name, "osc") == 0);
468 if (tgt->active == activate) {
469 CDEBUG(D_INFO, "OBD %p already %sactive!\n", obd,
470 activate ? "" : "in");
474 CDEBUG(D_INFO, "Marking OBD %p %sactive\n", obd, activate ? "" : "in");
476 tgt->active = activate;
481 * if (file_handle uses this_osc)
482 * if (has_no_filehandle)
483 * open(file_handle, this_osc);
486 lov->desc.ld_active_tgt_count++;
489 * Should I invalidate filehandles that refer to this OSC, so
490 * that I reopen them during reactivation?
492 /* XXX disconnect from OSC? */
493 lov->desc.ld_active_tgt_count--;
496 #warning "FIXME: walk open files list for objects that need opening"
499 spin_unlock(&lov->lov_lock);
503 static int lov_setup(struct obd_device *obd, obd_count len, void *buf)
505 struct obd_ioctl_data *data = buf;
506 struct lov_obd *lov = &obd->u.lov;
510 if (data->ioc_inllen1 < 1) {
511 CERROR("LOV setup requires an MDC name\n");
515 spin_lock_init(&lov->lov_lock);
516 lov->mdcobd = class_name2obd(data->ioc_inlbuf1);
518 CERROR("LOV %s cannot locate MDC %s\n", obd->obd_uuid.uuid,
525 /* compute object size given "stripeno" and the ost size */
526 static obd_size lov_stripe_size(struct lov_stripe_md *lsm, obd_size ost_size,
529 unsigned long ssize = lsm->lsm_stripe_size;
530 unsigned long swidth = ssize * lsm->lsm_stripe_count;
531 unsigned long stripe_size;
537 /* do_div(a, b) returns a % b, and a = a / b */
538 stripe_size = do_div(ost_size, ssize);
541 lov_size = ost_size * swidth + stripeno * ssize + stripe_size;
543 lov_size = (ost_size - 1) * swidth + (stripeno + 1) * ssize;
548 static void lov_merge_attrs(struct obdo *tgt, struct obdo *src, obd_flag valid,
549 struct lov_stripe_md *lsm, int stripeno, int *set)
552 if (valid & OBD_MD_FLSIZE) {
553 /* this handles sparse files properly */
556 lov_size = lov_stripe_size(lsm, src->o_size, stripeno);
557 if (lov_size > tgt->o_size)
558 tgt->o_size = lov_size;
560 if (valid & OBD_MD_FLBLOCKS)
561 tgt->o_blocks += src->o_blocks;
562 if (valid & OBD_MD_FLBLKSZ)
563 tgt->o_blksize += src->o_blksize;
564 if (valid & OBD_MD_FLCTIME && tgt->o_ctime < src->o_ctime)
565 tgt->o_ctime = src->o_ctime;
566 if (valid & OBD_MD_FLMTIME && tgt->o_mtime < src->o_mtime)
567 tgt->o_mtime = src->o_mtime;
569 obdo_cpy_md(tgt, src, valid);
570 if (valid & OBD_MD_FLSIZE)
571 tgt->o_size = lov_stripe_size(lsm,src->o_size,stripeno);
576 /* the LOV expects oa->o_id to be set to the LOV object id */
577 static int lov_create(struct lustre_handle *conn, struct obdo *oa,
578 struct lov_stripe_md **ea, struct obd_trans_info *oti)
580 struct obd_export *export = class_conn2export(conn);
582 struct lov_stripe_md *lsm;
583 struct lov_oinfo *loi;
585 unsigned ost_count, ost_idx;
586 int set = 0, obj_alloc = 0;
593 GOTO(out_exp, rc = -EINVAL);
595 lov = &export->exp_obd->u.lov;
597 if (!lov->desc.ld_active_tgt_count)
598 GOTO(out_exp, rc = -EIO);
602 GOTO(out_exp, rc = -ENOMEM);
607 rc = obd_alloc_memmd(conn, &lsm);
612 lsm->lsm_magic = LOV_MAGIC;
615 ost_count = lov->desc.ld_tgt_count;
617 LASSERT(oa->o_valid & OBD_MD_FLID);
618 lsm->lsm_object_id = oa->o_id;
619 if (!lsm->lsm_stripe_size)
620 lsm->lsm_stripe_size = lov->desc.ld_default_stripe_size;
622 if (!*ea || lsm->lsm_stripe_offset >= ost_count) {
623 get_random_bytes(&ost_idx, 2);
624 ost_idx %= ost_count;
626 ost_idx = lsm->lsm_stripe_offset;
628 CDEBUG(D_INODE, "allocating %d subobjs for objid "LPX64" at idx %d\n",
629 lsm->lsm_stripe_count, lsm->lsm_object_id, ost_idx);
631 loi = lsm->lsm_oinfo;
632 for (i = 0; i < ost_count; i++, ost_idx = (ost_idx + 1) % ost_count) {
633 struct lov_stripe_md obj_md;
634 struct lov_stripe_md *obj_mdp = &obj_md;
637 if (lov->tgts[ost_idx].active == 0) {
638 CDEBUG(D_HA, "lov idx %d inactive\n", ost_idx);
642 /* create data objects with "parent" OA */
643 memcpy(tmp, oa, sizeof(*tmp));
644 /* XXX: LOV STACKING: use real "obj_mdp" sub-data */
645 err = obd_create(&lov->tgts[ost_idx].conn, tmp, &obj_mdp, oti);
647 if (lov->tgts[ost_idx].active) {
648 CERROR("error creating objid "LPX64" sub-object"
649 " on OST idx %d/%d: rc = %d\n", oa->o_id,
650 ost_idx, lsm->lsm_stripe_count, err);
652 CERROR("obd_create returned invalid "
661 loi->loi_id = tmp->o_id;
662 loi->loi_ost_idx = ost_idx;
663 CDEBUG(D_INODE, "objid "LPX64" has subobj "LPX64" at idx %d\n",
664 lsm->lsm_object_id, loi->loi_id, ost_idx);
667 lsm->lsm_stripe_offset = ost_idx;
668 lov_merge_attrs(oa, tmp, OBD_MD_FLBLKSZ, lsm, obj_alloc, &set);
669 ot_init(&loi->loi_dirty_ot_inline);
670 loi->loi_dirty_ot = &loi->loi_dirty_ot_inline;
675 /* If we have allocated enough objects, we are OK */
676 if (obj_alloc == lsm->lsm_stripe_count)
677 GOTO(out_done, rc = 0);
681 CERROR("can't lstripe objid "LPX64": have %u want %u, rc %d\n",
682 lsm->lsm_object_id, obj_alloc, lsm->lsm_stripe_count,rc);
685 GOTO(out_cleanup, rc);
687 struct lov_stripe_md *lsm_new;
688 /* XXX LOV STACKING call into osc for sizes */
689 unsigned size = lov_stripe_md_size(obj_alloc);
691 CERROR("reallocating LSM for objid "LPX64": old %u new %u\n",
692 lsm->lsm_object_id, obj_alloc, lsm->lsm_stripe_count);
693 OBD_ALLOC(lsm_new, size);
695 GOTO(out_cleanup, rc = -ENOMEM);
696 memcpy(lsm_new, lsm, size);
697 lsm_new->lsm_stripe_count = obj_alloc;
699 /* XXX LOV STACKING call into osc for sizes */
700 OBD_FREE(lsm, lov_stripe_md_size(lsm->lsm_stripe_count));
711 class_export_put(export);
715 while (obj_alloc-- > 0) {
719 /* destroy already created objects here */
720 memcpy(tmp, oa, sizeof(*tmp));
721 tmp->o_id = loi->loi_id;
722 err = obd_destroy(&lov->tgts[loi->loi_ost_idx].conn, tmp, NULL,
725 CERROR("Failed to uncreate objid "LPX64" subobj "
726 LPX64" on OST idx %d: rc = %d\n",
727 oa->o_id, loi->loi_id, loi->loi_ost_idx,
731 obd_free_memmd(conn, &lsm);
735 #define lsm_bad_magic(LSMP) \
737 struct lov_stripe_md *_lsm__ = (LSMP); \
740 CERROR("LOV requires striping ea\n"); \
742 } else if (_lsm__->lsm_magic != LOV_MAGIC) { \
743 CERROR("LOV striping magic bad %#x != %#x\n", \
744 _lsm__->lsm_magic, LOV_MAGIC); \
750 static int lov_destroy(struct lustre_handle *conn, struct obdo *oa,
751 struct lov_stripe_md *lsm, struct obd_trans_info *oti)
754 struct obd_export *export = class_conn2export(conn);
756 struct lov_oinfo *loi;
757 struct lov_file_handles *lfh = NULL;
761 if (lsm_bad_magic(lsm))
762 GOTO(out, rc = -EINVAL);
764 if (!export || !export->exp_obd)
765 GOTO(out, rc = -ENODEV);
767 if (oa->o_valid & OBD_MD_FLHANDLE)
768 lfh = lov_handle2lfh(obdo_handle(oa));
770 lov = &export->exp_obd->u.lov;
771 for (i = 0,loi = lsm->lsm_oinfo; i < lsm->lsm_stripe_count; i++,loi++) {
773 if (lov->tgts[loi->loi_ost_idx].active == 0) {
774 CDEBUG(D_HA, "lov idx %d inactive\n", loi->loi_ost_idx);
775 /* Orphan clean up will (someday) fix this up. */
779 memcpy(&tmp, oa, sizeof(tmp));
780 tmp.o_id = loi->loi_id;
782 memcpy(obdo_handle(&tmp), lfh->lfh_och + i,
785 tmp.o_valid &= ~OBD_MD_FLHANDLE;
786 err = obd_destroy(&lov->tgts[loi->loi_ost_idx].conn, &tmp,
788 if (err && lov->tgts[loi->loi_ost_idx].active) {
789 CERROR("error: destroying objid "LPX64" subobj "
790 LPX64" on OST idx %d: rc = %d\n",
791 oa->o_id, loi->loi_id, loi->loi_ost_idx, err);
800 class_export_put(export);
804 static int lov_getattr(struct lustre_handle *conn, struct obdo *oa,
805 struct lov_stripe_md *lsm)
808 struct obd_export *export = class_conn2export(conn);
810 struct lov_oinfo *loi;
811 struct lov_file_handles *lfh = NULL;
812 int i, rc = 0, set = 0;
815 if (lsm_bad_magic(lsm))
816 GOTO(out, rc = -EINVAL);
818 if (!export || !export->exp_obd)
819 GOTO(out, rc = -ENODEV);
821 lov = &export->exp_obd->u.lov;
823 if (oa->o_valid & OBD_MD_FLHANDLE)
824 lfh = lov_handle2lfh(obdo_handle(oa));
826 CDEBUG(D_INFO, "objid "LPX64": %ux%u byte stripes\n",
827 lsm->lsm_object_id, lsm->lsm_stripe_count, lsm->lsm_stripe_size);
828 for (i = 0,loi = lsm->lsm_oinfo; i < lsm->lsm_stripe_count; i++,loi++) {
831 if (lov->tgts[loi->loi_ost_idx].active == 0) {
832 CDEBUG(D_HA, "lov idx %d inactive\n", loi->loi_ost_idx);
836 CDEBUG(D_INFO, "objid "LPX64"[%d] has subobj "LPX64" at idx "
837 "%u\n", oa->o_id, i, loi->loi_id, loi->loi_ost_idx);
838 /* create data objects with "parent" OA */
839 memcpy(&tmp, oa, sizeof(tmp));
840 tmp.o_id = loi->loi_id;
842 memcpy(obdo_handle(&tmp), lfh->lfh_och + i,
845 tmp.o_valid &= ~OBD_MD_FLHANDLE;
847 err = obd_getattr(&lov->tgts[loi->loi_ost_idx].conn, &tmp,NULL);
849 if (lov->tgts[loi->loi_ost_idx].active) {
850 CERROR("error: getattr objid "LPX64" subobj "
851 LPX64" on OST idx %d: rc = %d\n",
852 oa->o_id, loi->loi_id, loi->loi_ost_idx,
857 lov_merge_attrs(oa, &tmp, tmp.o_valid, lsm, i, &set);
866 class_export_put(export);
870 static int lov_getattr_interpret(struct ptlrpc_request_set *rqset,
871 struct lov_getattr_async_args *aa, int rc)
873 struct lov_stripe_md *lsm = aa->aa_lsm;
874 struct obdo *oa = aa->aa_oa;
875 struct obdo *obdos = aa->aa_stripe_oas;
876 struct lov_oinfo *loi;
882 /* NB all stripe requests succeeded to get here */
884 for (i = 0,loi = lsm->lsm_oinfo; i < lsm->lsm_stripe_count;
886 if (obdos[i].o_valid == 0) /* inactive stripe */
889 lov_merge_attrs(oa, &obdos[i], obdos[i].o_valid, lsm,
894 CERROR ("No stripes had valid attrs\n");
899 OBD_FREE (obdos, lsm->lsm_stripe_count * sizeof (*obdos));
903 static int lov_getattr_async (struct lustre_handle *conn, struct obdo *oa,
904 struct lov_stripe_md *lsm,
905 struct ptlrpc_request_set *rqset)
908 struct obd_export *export = class_conn2export(conn);
910 struct lov_oinfo *loi;
911 struct lov_file_handles *lfh = NULL;
912 struct lov_getattr_async_args *aa;
919 CERROR("LOV requires striping ea\n");
920 GOTO(out, rc = -EINVAL);
923 if (lsm->lsm_magic != LOV_MAGIC) {
924 CERROR("LOV striping magic bad %#x != %#x\n",
925 lsm->lsm_magic, LOV_MAGIC);
926 GOTO(out, rc = -EINVAL);
929 if (!export || !export->exp_obd)
930 GOTO(out, rc = -ENODEV);
932 lov = &export->exp_obd->u.lov;
934 OBD_ALLOC (obdos, lsm->lsm_stripe_count * sizeof (*obdos));
936 GOTO (out, rc = -ENOMEM);
938 if (oa->o_valid & OBD_MD_FLHANDLE)
939 lfh = lov_handle2lfh(obdo_handle(oa));
941 CDEBUG(D_INFO, "objid "LPX64": %ux%u byte stripes\n",
942 lsm->lsm_object_id, lsm->lsm_stripe_count, lsm->lsm_stripe_size);
943 for (i = 0,loi = lsm->lsm_oinfo; i < lsm->lsm_stripe_count; i++,loi++) {
946 if (lov->tgts[loi->loi_ost_idx].active == 0) {
947 CDEBUG(D_HA, "lov idx %d inactive\n", loi->loi_ost_idx);
948 /* leaves obdos[i].obd_valid unset */
952 CDEBUG(D_INFO, "objid "LPX64"[%d] has subobj "LPX64" at idx "
953 "%u\n", oa->o_id, i, loi->loi_id, loi->loi_ost_idx);
954 /* create data objects with "parent" OA */
955 memcpy(&obdos[i], oa, sizeof(obdos[i]));
956 obdos[i].o_id = loi->loi_id;
958 memcpy(obdo_handle(&obdos[i]), lfh->lfh_och + i,
961 obdos[i].o_valid &= ~OBD_MD_FLHANDLE;
963 err = obd_getattr_async (&lov->tgts[loi->loi_ost_idx].conn,
964 &obdos[i], NULL, rqset);
966 CERROR("error: getattr objid "LPX64" subobj "
967 LPX64" on OST idx %d: rc = %d\n",
968 oa->o_id, loi->loi_id, loi->loi_ost_idx,
970 GOTO(out_obdos, rc = err);
975 GOTO (out_obdos, rc = -EIO);
977 LASSERT (rqset->set_interpret == NULL);
978 rqset->set_interpret = lov_getattr_interpret;
979 LASSERT (sizeof (rqset->set_args) >= sizeof (*aa));
980 aa = (struct lov_getattr_async_args *)&rqset->set_args;
983 aa->aa_stripe_oas = obdos;
987 OBD_FREE (obdos, lsm->lsm_stripe_count * sizeof (*obdos));
991 class_export_put(export);
995 static int lov_setattr(struct lustre_handle *conn, struct obdo *oa,
996 struct lov_stripe_md *lsm, struct obd_trans_info *oti)
999 struct obd_export *export = class_conn2export(conn);
1000 struct lov_obd *lov;
1001 struct lov_oinfo *loi;
1002 struct lov_file_handles *lfh = NULL;
1003 int rc = 0, i, set = 0;
1006 if (lsm_bad_magic(lsm))
1007 GOTO(out, rc = -EINVAL);
1009 if (!export || !export->exp_obd)
1010 GOTO(out, rc = -ENODEV);
1012 /* size changes should go through punch and not setattr */
1013 LASSERT(!(oa->o_valid & OBD_MD_FLSIZE));
1015 /* for now, we only expect mtime updates here */
1016 LASSERT(!(oa->o_valid & ~(OBD_MD_FLID |OBD_MD_FLTYPE |OBD_MD_FLMTIME)));
1020 GOTO(out, rc = -ENOMEM);
1022 if (oa->o_valid & OBD_MD_FLHANDLE)
1023 lfh = lov_handle2lfh(obdo_handle(oa));
1025 lov = &export->exp_obd->u.lov;
1026 for (i = 0,loi = lsm->lsm_oinfo; i < lsm->lsm_stripe_count; i++,loi++) {
1029 if (lov->tgts[loi->loi_ost_idx].active == 0) {
1030 CDEBUG(D_HA, "lov idx %d inactive\n", loi->loi_ost_idx);
1034 obdo_cpy_md(tmp, oa, oa->o_valid);
1037 memcpy(obdo_handle(tmp), lfh->lfh_och + i,
1040 tmp->o_valid &= ~OBD_MD_FLHANDLE;
1042 tmp->o_id = loi->loi_id;
1044 err = obd_setattr(&lov->tgts[loi->loi_ost_idx].conn, tmp,
1047 if (lov->tgts[loi->loi_ost_idx].active) {
1048 CERROR("error: setattr objid "LPX64" subobj "
1049 LPX64" on OST idx %d: rc = %d\n",
1050 oa->o_id, loi->loi_id, loi->loi_ost_idx,
1065 class_export_put(export);
1069 static int lov_open(struct lustre_handle *conn, struct obdo *oa,
1070 struct lov_stripe_md *lsm, struct obd_trans_info *oti,
1071 struct obd_client_handle *och)
1073 struct obdo *tmp; /* on the heap here, on the stack in lov_close? */
1074 struct obd_export *export = class_conn2export(conn);
1075 struct lov_obd *lov;
1076 struct lov_oinfo *loi;
1077 struct lov_file_handles *lfh = NULL;
1078 int set = 0, rc = 0, i;
1080 LASSERT(och != NULL);
1082 if (lsm_bad_magic(lsm))
1083 GOTO(out_exp, rc = -EINVAL);
1085 if (!export || !export->exp_obd)
1086 GOTO(out_exp, rc = -ENODEV);
1090 GOTO(out_exp, rc = -ENOMEM);
1092 lfh = lov_lfh_new();
1094 GOTO(out_tmp, rc = -ENOMEM);
1095 OBD_ALLOC(lfh->lfh_och, lsm->lsm_stripe_count * sizeof *och);
1097 GOTO(out_lfh, rc = -ENOMEM);
1099 lov = &export->exp_obd->u.lov;
1102 for (i = 0,loi = lsm->lsm_oinfo; i < lsm->lsm_stripe_count; i++,loi++) {
1103 if (lov->tgts[loi->loi_ost_idx].active == 0) {
1104 CDEBUG(D_HA, "lov idx %d inactive\n", loi->loi_ost_idx);
1108 /* create data objects with "parent" OA */
1109 memcpy(tmp, oa, sizeof(*tmp));
1110 tmp->o_id = loi->loi_id;
1112 rc = obd_open(&lov->tgts[loi->loi_ost_idx].conn, tmp,
1113 NULL, NULL, lfh->lfh_och + i);
1115 if (!lov->tgts[loi->loi_ost_idx].active) {
1119 CERROR("error: open objid "LPX64" subobj "LPX64
1120 " on OST idx %d: rc = %d\n",
1121 oa->o_id, lsm->lsm_oinfo[i].loi_id,
1122 loi->loi_ost_idx, rc);
1126 lov_merge_attrs(oa, tmp, tmp->o_valid, lsm, i, &set);
1129 lfh->lfh_count = lsm->lsm_stripe_count;
1130 och->och_fh.cookie = lfh->lfh_handle.h_cookie;
1131 obdo_handle(oa)->cookie = lfh->lfh_handle.h_cookie;
1132 oa->o_valid |= OBD_MD_FLHANDLE;
1134 /* llfh refcount transfers to list */
1135 spin_lock(&export->exp_lov_data.led_lock);
1136 list_add(&lfh->lfh_list, &export->exp_lov_data.led_open_head);
1137 spin_unlock(&export->exp_lov_data.led_lock);
1143 class_export_put(export);
1147 for (i--, loi = &lsm->lsm_oinfo[i]; i >= 0; i--, loi--) {
1150 if (lov->tgts[loi->loi_ost_idx].active == 0)
1153 memcpy(tmp, oa, sizeof(*tmp));
1154 tmp->o_id = loi->loi_id;
1155 memcpy(obdo_handle(tmp), lfh->lfh_och + i, FD_OSTDATA_SIZE);
1157 err = obd_close(&lov->tgts[loi->loi_ost_idx].conn, tmp,
1159 if (err && lov->tgts[loi->loi_ost_idx].active) {
1160 CERROR("error: closing objid "LPX64" subobj "LPX64
1161 " on OST idx %d after open error: rc=%d\n",
1162 oa->o_id, loi->loi_id, loi->loi_ost_idx, err);
1166 OBD_FREE(lfh->lfh_och, lsm->lsm_stripe_count * FD_OSTDATA_SIZE);
1168 lov_lfh_destroy(lfh);
1173 static int lov_close(struct lustre_handle *conn, struct obdo *oa,
1174 struct lov_stripe_md *lsm, struct obd_trans_info *oti)
1177 struct obd_export *export = class_conn2export(conn);
1178 struct lov_obd *lov;
1179 struct lov_oinfo *loi;
1180 struct lov_file_handles *lfh = NULL;
1184 if (lsm_bad_magic(lsm))
1185 GOTO(out, rc = -EINVAL);
1187 if (!export || !export->exp_obd)
1188 GOTO(out, rc = -ENODEV);
1190 if (oa->o_valid & OBD_MD_FLHANDLE)
1191 lfh = lov_handle2lfh(obdo_handle(oa));
1193 lov = &export->exp_obd->u.lov;
1194 for (i = 0,loi = lsm->lsm_oinfo; i < lsm->lsm_stripe_count; i++,loi++) {
1197 /* create data objects with "parent" OA */
1198 memcpy(&tmp, oa, sizeof(tmp));
1199 tmp.o_id = loi->loi_id;
1201 memcpy(obdo_handle(&tmp), lfh->lfh_och + i,
1204 tmp.o_valid &= ~OBD_MD_FLHANDLE;
1206 err = obd_close(&lov->tgts[loi->loi_ost_idx].conn, &tmp,
1209 if (lov->tgts[loi->loi_ost_idx].active) {
1210 CERROR("error: close objid "LPX64" subobj "LPX64
1211 " on OST idx %d: rc = %d\n", oa->o_id,
1212 loi->loi_id, loi->loi_ost_idx, err);
1219 spin_lock(&export->exp_lov_data.led_lock);
1220 list_del(&lfh->lfh_list);
1221 spin_unlock(&export->exp_lov_data.led_lock);
1222 lov_lfh_put(lfh); /* drop the reference owned by the list */
1224 OBD_FREE(lfh->lfh_och, lsm->lsm_stripe_count * FD_OSTDATA_SIZE);
1225 lov_lfh_destroy(lfh);
1226 lov_lfh_put(lfh); /* balance handle2lfh above */
1230 class_export_put(export);
1235 #define log2(n) ffz(~(n))
1238 /* we have an offset in file backed by an lov and want to find out where
1239 * that offset lands in our given stripe of the file. for the easy
1240 * case where the offset is within the stripe, we just have to scale the
1241 * offset down to make it relative to the stripe instead of the lov.
1243 * the harder case is what to do when the offset doesn't intersect the
1244 * stripe. callers will want start offsets clamped ahead to the start
1245 * of the nearest stripe in the file. end offsets similarly clamped to the
1246 * nearest ending byte of a stripe in the file:
1248 * all this function does is move offsets to the nearest region of the
1249 * stripe, and it does its work "mod" the full length of all the stripes.
1250 * consider a file with 3 stripes:
1253 * ---------------------------------------------------------------------
1254 * | 0 | 1 | 2 | 0 | 1 | 2 |
1255 * ---------------------------------------------------------------------
1257 * to find stripe 1's offsets for S and E, it divides by the full stripe
1258 * width and does its math in the context of a single set of stripes:
1261 * -----------------------------------
1263 * -----------------------------------
1265 * it'll notice that E is outside stripe 1 and clamp it to the end of the
1266 * stripe, then multiply it back out by lov_off to give the real offsets in
1270 * ---------------------------------------------------------------------
1271 * | 1 | 1 | 1 | 1 | 1 | 1 |
1272 * ---------------------------------------------------------------------
1274 * it would have done similarly and pulled S forward to the start of a 1
1275 * stripe if, say, S had landed in a 0 stripe.
1277 * this rounding isn't always correct. consider an E lov offset that lands
1278 * on a 0 stripe, the "mod stripe width" math will pull it forward to the
1279 * start of a 1 stripe, when in fact it wanted to be rounded back to the end
1280 * of a previous 1 stripe. this logic is handled by callers and this is why:
1282 * this function returns < 0 when the offset was "before" the stripe and
1283 * was moved forward to the start of the stripe in question; 0 when it
1284 * falls in the stripe and no shifting was done; > 0 when the offset
1285 * was outside the stripe and was pulled back to its final byte. */
1286 static int lov_stripe_offset(struct lov_stripe_md *lsm, obd_off lov_off,
1287 int stripeno, obd_off *obd_off)
1289 unsigned long ssize = lsm->lsm_stripe_size;
1290 unsigned long swidth = ssize * lsm->lsm_stripe_count;
1291 unsigned long stripe_off, this_stripe;
1294 if (lov_off == OBD_OBJECT_EOF) {
1295 *obd_off = OBD_OBJECT_EOF;
1299 /* do_div(a, b) returns a % b, and a = a / b */
1300 stripe_off = do_div(lov_off, swidth);
1302 this_stripe = stripeno * ssize;
1303 if (stripe_off < this_stripe) {
1307 stripe_off -= this_stripe;
1309 if (stripe_off >= ssize) {
1315 *obd_off = lov_off * ssize + stripe_off;
1319 /* given an extent in an lov and a stripe, calculate the extent of the stripe
1320 * that is contained within the lov extent. this returns true if the given
1321 * stripe does intersect with the lov extent. */
1322 static int lov_stripe_intersects(struct lov_stripe_md *lsm, int stripeno,
1323 obd_off start, obd_off end,
1324 obd_off *obd_start, obd_off *obd_end)
1326 int start_side, end_side;
1328 start_side = lov_stripe_offset(lsm, start, stripeno, obd_start);
1329 end_side = lov_stripe_offset(lsm, end, stripeno, obd_end);
1331 CDEBUG(D_INODE, "["LPU64"->"LPU64"] -> [(%d) "LPU64"->"LPU64" (%d)]\n",
1332 start, end, start_side, *obd_start, *obd_end, end_side);
1334 /* this stripe doesn't intersect the file extent when neither
1335 * start or the end intersected the stripe and obd_start and
1336 * obd_end got rounded up to the save value. */
1337 if (start_side != 0 && end_side != 0 && *obd_start == *obd_end)
1340 /* as mentioned in the lov_stripe_offset commentary, end
1341 * might have been shifted in the wrong direction. This
1342 * happens when an end offset is before the stripe when viewed
1343 * through the "mod stripe size" math. we detect it being shifted
1344 * in the wrong direction and touch it up.
1345 * interestingly, this can't underflow since end must be > start
1346 * if we passed through the previous check.
1347 * (should we assert for that somewhere?) */
1354 /* compute which stripe number "lov_off" will be written into */
1355 static int lov_stripe_number(struct lov_stripe_md *lsm, obd_off lov_off)
1357 unsigned long ssize = lsm->lsm_stripe_size;
1358 unsigned long swidth = ssize * lsm->lsm_stripe_count;
1359 unsigned long stripe_off;
1361 stripe_off = do_div(lov_off, swidth);
1363 return stripe_off / ssize;
1366 /* FIXME: maybe we'll just make one node the authoritative attribute node, then
1367 * we can send this 'punch' to just the authoritative node and the nodes
1368 * that the punch will affect. */
1369 static int lov_punch(struct lustre_handle *conn, struct obdo *oa,
1370 struct lov_stripe_md *lsm,
1371 obd_off start, obd_off end, struct obd_trans_info *oti)
1374 struct obd_export *export = class_conn2export(conn);
1375 struct lov_obd *lov;
1376 struct lov_oinfo *loi;
1377 struct lov_file_handles *lfh = NULL;
1381 if (lsm_bad_magic(lsm))
1382 GOTO(out, rc = -EINVAL);
1384 if (!export || !export->exp_obd)
1385 GOTO(out, rc = -ENODEV);
1387 if (oa->o_valid & OBD_MD_FLHANDLE)
1388 lfh = lov_handle2lfh(obdo_handle(oa));
1390 lov = &export->exp_obd->u.lov;
1391 for (i = 0,loi = lsm->lsm_oinfo; i < lsm->lsm_stripe_count; i++,loi++) {
1392 obd_off starti, endi;
1395 if (lov->tgts[loi->loi_ost_idx].active == 0) {
1396 CDEBUG(D_HA, "lov idx %d inactive\n", loi->loi_ost_idx);
1400 if (!lov_stripe_intersects(lsm, i, start, end, &starti, &endi))
1403 /* create data objects with "parent" OA */
1404 memcpy(&tmp, oa, sizeof(tmp));
1405 tmp.o_id = loi->loi_id;
1407 memcpy(obdo_handle(&tmp), lfh->lfh_och + i,
1410 tmp.o_valid &= ~OBD_MD_FLHANDLE;
1412 err = obd_punch(&lov->tgts[loi->loi_ost_idx].conn, &tmp, NULL,
1413 starti, endi, NULL);
1415 if (lov->tgts[loi->loi_ost_idx].active) {
1416 CERROR("error: punch objid "LPX64" subobj "LPX64
1417 " on OST idx %d: rc = %d\n", oa->o_id,
1418 loi->loi_id, loi->loi_ost_idx, err);
1428 class_export_put(export);
1432 static int lov_brw_check(struct lov_obd *lov, struct lov_stripe_md *lsm,
1433 obd_count oa_bufs, struct brw_page *pga)
1437 /* The caller just wants to know if there's a chance that this
1438 * I/O can succeed */
1439 for (i = 0; i < oa_bufs; i++) {
1440 int stripe = lov_stripe_number(lsm, pga[i].off);
1441 int ost = lsm->lsm_oinfo[stripe].loi_ost_idx;
1442 struct ldlm_extent ext, subext;
1443 ext.start = pga[i].off;
1444 ext.start = pga[i].off + pga[i].count;
1446 if (!lov_stripe_intersects(lsm, i, ext.start, ext.end,
1447 &subext.start, &subext.end))
1450 if (lov->tgts[ost].active == 0) {
1451 CDEBUG(D_HA, "lov idx %d inactive\n", ost);
1458 static int lov_brw(int cmd, struct lustre_handle *conn,
1459 struct lov_stripe_md *lsm, obd_count oa_bufs,
1460 struct brw_page *pga, struct obd_trans_info *oti)
1466 struct lov_stripe_md lsm;
1468 } *stripeinfo, *si, *si_last;
1469 struct obd_export *export = class_conn2export(conn);
1470 struct lov_obd *lov;
1471 struct brw_page *ioarr;
1472 struct lov_oinfo *loi;
1473 int rc = 0, i, *where, stripe_count = lsm->lsm_stripe_count;
1476 if (lsm_bad_magic(lsm))
1477 GOTO(out_exp, rc = -EINVAL);
1479 lov = &export->exp_obd->u.lov;
1481 if (cmd == OBD_BRW_CHECK) {
1482 rc = lov_brw_check(lov, lsm, oa_bufs, pga);
1486 OBD_ALLOC(stripeinfo, stripe_count * sizeof(*stripeinfo));
1488 GOTO(out_exp, rc = -ENOMEM);
1490 OBD_ALLOC(where, sizeof(*where) * oa_bufs);
1492 GOTO(out_sinfo, rc = -ENOMEM);
1494 OBD_ALLOC(ioarr, sizeof(*ioarr) * oa_bufs);
1496 GOTO(out_where, rc = -ENOMEM);
1498 for (i = 0; i < oa_bufs; i++) {
1499 where[i] = lov_stripe_number(lsm, pga[i].off);
1500 stripeinfo[where[i]].bufct++;
1503 for (i = 0, loi = lsm->lsm_oinfo, si_last = si = stripeinfo;
1504 i < stripe_count; i++, loi++, si_last = si, si++) {
1506 si->index = si_last->index + si_last->bufct;
1507 si->lsm.lsm_object_id = loi->loi_id;
1508 si->ost_idx = loi->loi_ost_idx;
1511 for (i = 0; i < oa_bufs; i++) {
1512 int which = where[i];
1515 shift = stripeinfo[which].index + stripeinfo[which].subcount;
1516 LASSERT(shift < oa_bufs);
1517 ioarr[shift] = pga[i];
1518 lov_stripe_offset(lsm, pga[i].off, which, &ioarr[shift].off);
1519 stripeinfo[which].subcount++;
1522 for (i = 0, si = stripeinfo; i < stripe_count; i++, si++) {
1523 int shift = si->index;
1525 if (lov->tgts[si->ost_idx].active == 0) {
1526 CDEBUG(D_HA, "lov idx %d inactive\n", si->ost_idx);
1527 GOTO(out_ioarr, rc = -EIO);
1531 LASSERT(shift < oa_bufs);
1532 rc = obd_brw(cmd, &lov->tgts[si->ost_idx].conn,
1533 &si->lsm, si->bufct, &ioarr[shift],
1536 GOTO(out_ioarr, rc);
1539 GOTO(out_ioarr, rc);
1541 OBD_FREE(ioarr, sizeof(*ioarr) * oa_bufs);
1543 OBD_FREE(where, sizeof(*where) * oa_bufs);
1545 OBD_FREE(stripeinfo, stripe_count * sizeof(*stripeinfo));
1547 class_export_put(export);
1551 static int lov_brw_interpret (struct ptlrpc_request_set *set,
1552 struct lov_brw_async_args *aa, int rc)
1554 obd_count oa_bufs = aa->aa_oa_bufs;
1555 struct brw_page *ioarr = aa->aa_ioarr;
1558 OBD_FREE (ioarr, sizeof (*ioarr) * oa_bufs);
1562 static int lov_brw_async(int cmd, struct lustre_handle *conn,
1563 struct lov_stripe_md *lsm, obd_count oa_bufs,
1564 struct brw_page *pga, struct ptlrpc_request_set *set,
1565 struct obd_trans_info *oti)
1571 struct lov_stripe_md lsm;
1573 } *stripeinfo, *si, *si_last;
1574 struct obd_export *export = class_conn2export(conn);
1575 struct lov_obd *lov;
1576 struct brw_page *ioarr;
1577 struct lov_oinfo *loi;
1578 struct lov_brw_async_args *aa;
1579 int rc = 0, i, *where, stripe_count = lsm->lsm_stripe_count;
1582 if (lsm_bad_magic(lsm))
1583 GOTO(out_exp, rc = -EINVAL);
1585 lov = &export->exp_obd->u.lov;
1587 if (cmd == OBD_BRW_CHECK) {
1588 rc = lov_brw_check(lov, lsm, oa_bufs, pga);
1592 OBD_ALLOC(stripeinfo, stripe_count * sizeof(*stripeinfo));
1594 GOTO(out_exp, rc = -ENOMEM);
1596 OBD_ALLOC(where, sizeof(*where) * oa_bufs);
1598 GOTO(out_sinfo, rc = -ENOMEM);
1600 OBD_ALLOC(ioarr, sizeof(*ioarr) * oa_bufs);
1602 GOTO(out_where, rc = -ENOMEM);
1604 for (i = 0; i < oa_bufs; i++) {
1605 where[i] = lov_stripe_number(lsm, pga[i].off);
1606 stripeinfo[where[i]].bufct++;
1609 for (i = 0, loi = lsm->lsm_oinfo, si_last = si = stripeinfo;
1610 i < stripe_count; i++, loi++, si_last = si, si++) {
1612 si->index = si_last->index + si_last->bufct;
1613 si->lsm.lsm_object_id = loi->loi_id;
1614 si->ost_idx = loi->loi_ost_idx;
1617 for (i = 0; i < oa_bufs; i++) {
1618 int which = where[i];
1621 shift = stripeinfo[which].index + stripeinfo[which].subcount;
1622 LASSERT(shift < oa_bufs);
1623 ioarr[shift] = pga[i];
1624 lov_stripe_offset(lsm, pga[i].off, which, &ioarr[shift].off);
1625 stripeinfo[which].subcount++;
1628 for (i = 0, si = stripeinfo; i < stripe_count; i++, si++) {
1629 int shift = si->index;
1634 if (lov->tgts[si->ost_idx].active == 0) {
1635 CDEBUG(D_HA, "lov idx %d inactive\n", si->ost_idx);
1636 GOTO(out_ioarr, rc = -EIO);
1639 LASSERT(shift < oa_bufs);
1640 rc = obd_brw_async(cmd, &lov->tgts[si->ost_idx].conn,
1641 &si->lsm, si->bufct, &ioarr[shift],
1644 GOTO(out_ioarr, rc);
1647 LASSERT (set->set_interpret == NULL);
1648 set->set_interpret = lov_brw_interpret;
1649 LASSERT (sizeof (set->set_args) >= sizeof (struct lov_brw_async_args));
1650 aa = (struct lov_brw_async_args *)&set->set_args;
1651 aa->aa_oa_bufs = oa_bufs;
1652 aa->aa_ioarr = ioarr;
1653 GOTO(out_where, rc);
1655 OBD_FREE(ioarr, sizeof(*ioarr) * oa_bufs);
1657 OBD_FREE(where, sizeof(*where) * oa_bufs);
1659 OBD_FREE(stripeinfo, stripe_count * sizeof(*stripeinfo));
1661 class_export_put(export);
1665 static int lov_enqueue(struct lustre_handle *conn, struct lov_stripe_md *lsm,
1666 struct lustre_handle *parent_lock,
1667 __u32 type, void *cookie, int cookielen, __u32 mode,
1668 int *flags, void *cb, void *data,
1669 struct lustre_handle *lockh)
1671 struct obd_export *export = class_conn2export(conn);
1672 struct lov_lock_handles *lov_lockh = NULL;
1673 struct lustre_handle *lov_lockhp;
1674 struct lov_obd *lov;
1675 struct lov_oinfo *loi;
1676 struct lov_stripe_md submd;
1681 if (lsm_bad_magic(lsm))
1682 GOTO(out_exp, rc = -EINVAL);
1684 /* we should never be asked to replay a lock this way. */
1685 LASSERT((*flags & LDLM_FL_REPLAY) == 0);
1687 if (!export || !export->exp_obd)
1688 GOTO(out_exp, rc = -ENODEV);
1690 if (lsm->lsm_stripe_count > 1) {
1691 lov_lockh = lov_llh_new(lsm);
1692 if (lov_lockh == NULL)
1693 GOTO(out_exp, rc = -ENOMEM);
1695 lockh->cookie = lov_lockh->llh_handle.h_cookie;
1696 lov_lockhp = lov_lockh->llh_handles;
1701 lov = &export->exp_obd->u.lov;
1702 for (i = 0, loi = lsm->lsm_oinfo; i < lsm->lsm_stripe_count;
1703 i++, loi++, lov_lockhp++) {
1704 struct ldlm_extent *extent = (struct ldlm_extent *)cookie;
1705 struct ldlm_extent sub_ext;
1708 if (!lov_stripe_intersects(lsm, i, extent->start, extent->end,
1709 &sub_ext.start, &sub_ext.end))
1712 if (lov->tgts[loi->loi_ost_idx].active == 0) {
1713 CDEBUG(D_HA, "lov idx %d inactive\n", loi->loi_ost_idx);
1717 /* XXX LOV STACKING: submd should be from the subobj */
1718 submd.lsm_object_id = loi->loi_id;
1719 submd.lsm_stripe_count = 0;
1720 /* XXX submd is not fully initialized here */
1722 rc = obd_enqueue(&(lov->tgts[loi->loi_ost_idx].conn), &submd,
1723 parent_lock, type, &sub_ext, sizeof(sub_ext),
1724 mode, flags, cb, data, lov_lockhp);
1726 // XXX add a lock debug statement here
1727 if (rc != ELDLM_OK) {
1728 memset(lov_lockhp, 0, sizeof(*lov_lockhp));
1729 if (lov->tgts[loi->loi_ost_idx].active) {
1730 CERROR("error: enqueue objid "LPX64" subobj "
1731 LPX64" on OST idx %d: rc = %d\n",
1732 lsm->lsm_object_id, loi->loi_id,
1733 loi->loi_ost_idx, rc);
1738 if (lsm->lsm_stripe_count > 1)
1739 lov_llh_put(lov_lockh);
1740 GOTO(out_exp, rc = ELDLM_OK);
1743 while (loi--, lov_lockhp--, i-- > 0) {
1744 struct lov_stripe_md submd;
1747 if (lov_lockhp->cookie == 0)
1750 /* XXX LOV STACKING: submd should be from the subobj */
1751 submd.lsm_object_id = loi->loi_id;
1752 submd.lsm_stripe_count = 0;
1753 err = obd_cancel(&lov->tgts[loi->loi_ost_idx].conn, &submd,
1755 if (err && lov->tgts[loi->loi_ost_idx].active) {
1756 CERROR("error: cancelling objid "LPX64" on OST "
1757 "idx %d after enqueue error: rc = %d\n",
1758 loi->loi_id, loi->loi_ost_idx, err);
1762 if (lsm->lsm_stripe_count > 1) {
1763 lov_llh_destroy(lov_lockh);
1764 lov_llh_put(lov_lockh);
1767 class_export_put(export);
1771 static int lov_match(struct lustre_handle *conn, struct lov_stripe_md *lsm,
1772 __u32 type, void *cookie, int cookielen, __u32 mode,
1773 int *flags, void *data, struct lustre_handle *lockh)
1775 struct obd_export *export = class_conn2export(conn);
1776 struct lov_lock_handles *lov_lockh = NULL;
1777 struct lustre_handle *lov_lockhp;
1778 struct lov_obd *lov;
1779 struct lov_oinfo *loi;
1780 struct lov_stripe_md submd;
1781 ldlm_error_t rc = 0;
1785 if (lsm_bad_magic(lsm))
1786 GOTO(out_exp, rc = -EINVAL);
1788 if (!export || !export->exp_obd)
1789 GOTO(out_exp, rc = -ENODEV);
1791 if (lsm->lsm_stripe_count > 1) {
1792 lov_lockh = lov_llh_new(lsm);
1793 if (lov_lockh == NULL)
1794 GOTO(out_exp, rc = -ENOMEM);
1796 lockh->cookie = lov_lockh->llh_handle.h_cookie;
1797 lov_lockhp = lov_lockh->llh_handles;
1802 lov = &export->exp_obd->u.lov;
1803 for (i = 0, loi = lsm->lsm_oinfo; i < lsm->lsm_stripe_count;
1804 i++, loi++, lov_lockhp++) {
1805 struct ldlm_extent *extent = (struct ldlm_extent *)cookie;
1806 struct ldlm_extent sub_ext;
1809 if (!lov_stripe_intersects(lsm, i, extent->start, extent->end,
1810 &sub_ext.start, &sub_ext.end))
1813 if (lov->tgts[loi->loi_ost_idx].active == 0) {
1814 CDEBUG(D_HA, "lov idx %d inactive\n", loi->loi_ost_idx);
1819 /* XXX LOV STACKING: submd should be from the subobj */
1820 submd.lsm_object_id = loi->loi_id;
1821 submd.lsm_stripe_count = 0;
1823 /* XXX submd is not fully initialized here */
1824 rc = obd_match(&(lov->tgts[loi->loi_ost_idx].conn), &submd,
1825 type, &sub_ext, sizeof(sub_ext), mode,
1826 &lov_flags, data, lov_lockhp);
1831 if (lsm->lsm_stripe_count > 1)
1832 lov_llh_put(lov_lockh);
1836 while (loi--, lov_lockhp--, i-- > 0) {
1837 struct lov_stripe_md submd;
1840 if (lov_lockhp->cookie == 0)
1843 /* XXX LOV STACKING: submd should be from the subobj */
1844 submd.lsm_object_id = loi->loi_id;
1845 submd.lsm_stripe_count = 0;
1846 err = obd_cancel(&lov->tgts[loi->loi_ost_idx].conn, &submd,
1848 if (err && lov->tgts[loi->loi_ost_idx].active) {
1849 CERROR("error: cancelling objid "LPX64" on OST "
1850 "idx %d after match failure: rc = %d\n",
1851 loi->loi_id, loi->loi_ost_idx, err);
1855 if (lsm->lsm_stripe_count > 1) {
1856 lov_llh_destroy(lov_lockh);
1857 lov_llh_put(lov_lockh);
1860 class_export_put(export);
1864 static int lov_cancel(struct lustre_handle *conn, struct lov_stripe_md *lsm,
1865 __u32 mode, struct lustre_handle *lockh)
1867 struct obd_export *export = class_conn2export(conn);
1868 struct lov_lock_handles *lov_lockh = NULL;
1869 struct lustre_handle *lov_lockhp;
1870 struct lov_obd *lov;
1871 struct lov_oinfo *loi;
1875 if (lsm_bad_magic(lsm))
1876 GOTO(out, rc = -EINVAL);
1878 if (!export || !export->exp_obd)
1879 GOTO(out, rc = -ENODEV);
1882 if (lsm->lsm_stripe_count > 1) {
1883 lov_lockh = lov_handle2llh(lockh);
1885 CERROR("LOV: invalid lov lock handle %p\n", lockh);
1886 GOTO(out, rc = -EINVAL);
1889 lov_lockhp = lov_lockh->llh_handles;
1894 lov = &export->exp_obd->u.lov;
1895 for (i = 0, loi = lsm->lsm_oinfo; i < lsm->lsm_stripe_count;
1896 i++, loi++, lov_lockhp++) {
1897 struct lov_stripe_md submd;
1900 if (lov_lockhp->cookie == 0) {
1901 CDEBUG(D_HA, "lov idx %d subobj "LPX64" no lock?\n",
1902 loi->loi_ost_idx, loi->loi_id);
1906 /* XXX LOV STACKING: submd should be from the subobj */
1907 submd.lsm_object_id = loi->loi_id;
1908 submd.lsm_stripe_count = 0;
1909 err = obd_cancel(&lov->tgts[loi->loi_ost_idx].conn, &submd,
1912 if (lov->tgts[loi->loi_ost_idx].active) {
1913 CERROR("error: cancel objid "LPX64" subobj "
1914 LPX64" on OST idx %d: rc = %d\n",
1916 loi->loi_id, loi->loi_ost_idx, err);
1923 if (lsm->lsm_stripe_count > 1)
1924 lov_llh_destroy(lov_lockh);
1925 if (lov_lockh != NULL)
1926 lov_llh_put(lov_lockh);
1929 class_export_put(export);
1933 static int lov_cancel_unused(struct lustre_handle *conn,
1934 struct lov_stripe_md *lsm, int flags, void *opaque)
1936 struct obd_export *export = class_conn2export(conn);
1937 struct lov_obd *lov;
1938 struct lov_oinfo *loi;
1942 if (lsm_bad_magic(lsm))
1943 GOTO(out, rc = -EINVAL);
1945 if (!export || !export->exp_obd)
1946 GOTO(out, rc = -ENODEV);
1948 lov = &export->exp_obd->u.lov;
1949 for (i = 0,loi = lsm->lsm_oinfo; i < lsm->lsm_stripe_count; i++,loi++) {
1950 struct lov_stripe_md submd;
1953 if (lov->tgts[loi->loi_ost_idx].active == 0)
1954 CDEBUG(D_HA, "lov idx %d inactive\n", loi->loi_ost_idx);
1956 submd.lsm_object_id = loi->loi_id;
1957 submd.lsm_stripe_count = 0;
1958 err = obd_cancel_unused(&lov->tgts[loi->loi_ost_idx].conn,
1959 &submd, flags, opaque);
1960 if (err && lov->tgts[loi->loi_ost_idx].active) {
1961 CERROR("error: cancel unused objid "LPX64" subobj "LPX64
1962 " on OST idx %d: rc = %d\n", lsm->lsm_object_id,
1963 loi->loi_id, loi->loi_ost_idx, err);
1970 class_export_put(export);
1974 #define LOV_U64_MAX ((__u64)~0ULL)
1975 #define LOV_SUM_MAX(tot, add) \
1977 if ((tot) + (add) < (tot)) \
1978 (tot) = LOV_U64_MAX; \
1983 static int lov_statfs(struct obd_export *export, struct obd_statfs *osfs)
1985 struct obd_export *tgt_export;
1986 struct lov_obd *lov;
1987 struct obd_statfs lov_sfs;
1993 if (!export || !export->exp_obd)
1996 lov = &export->exp_obd->u.lov;
1998 /* We only get block data from the OBD */
1999 for (i = 0; i < lov->desc.ld_tgt_count; i++) {
2002 if (!lov->tgts[i].active) {
2003 CDEBUG(D_HA, "lov idx %d inactive\n", i);
2007 tgt_export = class_conn2export(&lov->tgts[i].conn);
2009 CDEBUG(D_HA, "lov idx %d NULL export\n", i);
2013 err = obd_statfs(tgt_export, &lov_sfs);
2014 class_export_put(tgt_export);
2016 if (lov->tgts[i].active) {
2017 CERROR("error: statfs OSC %s on OST idx %d: "
2019 lov->tgts[i].uuid.uuid, i, err);
2026 memcpy(osfs, &lov_sfs, sizeof(lov_sfs));
2029 osfs->os_bfree += lov_sfs.os_bfree;
2030 osfs->os_bavail += lov_sfs.os_bavail;
2031 osfs->os_blocks += lov_sfs.os_blocks;
2032 /* XXX not sure about this one - depends on policy.
2033 * - could be minimum if we always stripe on all OBDs
2034 * (but that would be wrong for any other policy,
2035 * if one of the OBDs has no more objects left)
2036 * - could be sum if we stripe whole objects
2037 * - could be average, just to give a nice number
2039 * To give a "reasonable" (if not wholly accurate)
2040 * number, we divide the total number of free objects
2041 * by expected stripe count (watch out for overflow).
2043 LOV_SUM_MAX(osfs->os_files, lov_sfs.os_files);
2044 LOV_SUM_MAX(osfs->os_ffree, lov_sfs.os_ffree);
2048 __u32 expected_stripes = lov->desc.ld_default_stripe_count ?
2049 lov->desc.ld_default_stripe_count :
2050 lov->desc.ld_active_tgt_count;
2052 if (osfs->os_files != LOV_U64_MAX)
2053 do_div(osfs->os_files, expected_stripes);
2054 if (osfs->os_ffree != LOV_U64_MAX)
2055 do_div(osfs->os_ffree, expected_stripes);
2061 static int lov_iocontrol(unsigned int cmd, struct lustre_handle *conn, int len,
2062 void *karg, void *uarg)
2064 struct obd_device *obddev = class_conn2obd(conn);
2065 struct lov_obd *lov = &obddev->u.lov;
2066 int i, count = lov->desc.ld_tgt_count;
2067 struct obd_uuid *uuidp;
2073 case IOC_LOV_SET_OSC_ACTIVE: {
2074 struct obd_ioctl_data *data = karg;
2075 uuidp = (struct obd_uuid *)data->ioc_inlbuf1;
2076 rc = lov_set_osc_active(lov, uuidp, data->ioc_offset);
2079 case OBD_IOC_LOV_GET_CONFIG: {
2080 struct obd_ioctl_data *data = karg;
2081 struct lov_tgt_desc *tgtdesc;
2082 struct lov_desc *desc;
2087 if (obd_ioctl_getdata(&buf, &len, (void *)uarg))
2090 data = (struct obd_ioctl_data *)buf;
2092 if (sizeof(*desc) > data->ioc_inllen1) {
2097 if (sizeof(uuidp->uuid) * count > data->ioc_inllen2) {
2102 desc = (struct lov_desc *)data->ioc_inlbuf1;
2103 memcpy(desc, &(lov->desc), sizeof(*desc));
2105 uuidp = (struct obd_uuid *)data->ioc_inlbuf2;
2106 tgtdesc = lov->tgts;
2107 for (i = 0; i < count; i++, uuidp++, tgtdesc++)
2108 obd_str2uuid(uuidp, tgtdesc->uuid.uuid);
2110 rc = copy_to_user((void *)uarg, buf, len);
2113 obd_ioctl_freedata(buf, len);
2116 case LL_IOC_LOV_SETSTRIPE:
2117 rc = lov_setstripe(conn, karg, uarg);
2119 case LL_IOC_LOV_GETSTRIPE:
2120 rc = lov_getstripe(conn, karg, uarg);
2127 for (i = 0; i < count; i++) {
2130 err = obd_iocontrol(cmd, &lov->tgts[i].conn,
2133 if (lov->tgts[i].active) {
2134 CERROR("error: iocontrol OSC %s on OST"
2135 "idx %d: err = %d\n",
2136 lov->tgts[i].uuid.uuid, i, err);
2151 static int lov_get_info(struct lustre_handle *conn, __u32 keylen,
2152 void *key, __u32 *vallen, void *val)
2154 struct obd_device *obddev = class_conn2obd(conn);
2155 struct lov_obd *lov = &obddev->u.lov;
2159 if (!vallen || !val)
2162 if (keylen > strlen("lock_to_stripe") &&
2163 strcmp(key, "lock_to_stripe") == 0) {
2166 struct ldlm_lock *lock;
2167 struct lov_stripe_md *lsm;
2169 __u32 *stripe = val;
2170 struct lov_oinfo *loi;
2172 if (*vallen < sizeof(*stripe))
2174 *vallen = sizeof(*stripe);
2176 /* XXX This is another one of those bits that will need to
2177 * change if we ever actually support nested LOVs. It uses
2178 * the lock's connection to find out which stripe it is. */
2179 for (i = 0, loi = data->lsm->lsm_oinfo;
2180 i < data->lsm->lsm_stripe_count;
2182 if (lov->tgts[loi->loi_ost_idx].conn.cookie ==
2183 data->lock->l_connh->cookie) {
2194 static int lov_mark_page_dirty(struct lustre_handle *conn,
2195 struct lov_stripe_md *lsm, unsigned long offset)
2197 struct lov_obd *lov = &class_conn2obd(conn)->u.lov;
2198 struct lov_oinfo *loi;
2199 struct lov_stripe_md *submd;
2204 if (lsm_bad_magic(lsm))
2207 OBD_ALLOC(submd, lov_stripe_md_size(1));
2211 stripe = lov_stripe_number(lsm, (obd_off)offset << PAGE_CACHE_SHIFT);
2212 lov_stripe_offset(lsm, (obd_off)offset << PAGE_CACHE_SHIFT, stripe,
2214 off >>= PAGE_CACHE_SHIFT;
2216 loi = &lsm->lsm_oinfo[stripe];
2217 CDEBUG(D_INODE, "off %lu => off %lu on stripe %d\n", offset,
2218 (unsigned long)off, stripe);
2219 submd->lsm_oinfo[0].loi_dirty_ot = &loi->loi_dirty_ot_inline;
2221 rc = obd_mark_page_dirty(&lov->tgts[loi->loi_ost_idx].conn, submd, off);
2222 OBD_FREE(submd, lov_stripe_md_size(1));
2226 static int lov_clear_dirty_pages(struct lustre_handle *conn,
2227 struct lov_stripe_md *lsm, unsigned long start,
2228 unsigned long end, unsigned long *cleared)
2231 struct obd_export *export = class_conn2export(conn);
2232 __u64 start_off = (__u64)start << PAGE_CACHE_SHIFT;
2233 __u64 end_off = (__u64)end << PAGE_CACHE_SHIFT;
2234 __u64 obd_start, obd_end;
2235 struct lov_stripe_md *submd = NULL;
2236 struct lov_obd *lov;
2237 struct lov_oinfo *loi;
2239 unsigned long osc_cleared;
2244 if (lsm_bad_magic(lsm))
2245 GOTO(out_exp, rc = -EINVAL);
2247 if (!export || !export->exp_obd)
2248 GOTO(out_exp, rc = -ENODEV);
2250 OBD_ALLOC(submd, lov_stripe_md_size(1));
2252 GOTO(out_exp, rc = -ENOMEM);
2254 lov = &export->exp_obd->u.lov;
2256 for (i = 0, loi = lsm->lsm_oinfo;
2257 i < lsm->lsm_stripe_count;
2259 if (lov->tgts[loi->loi_ost_idx].active == 0) {
2260 CDEBUG(D_HA, "lov idx %d inactive\n", loi->loi_ost_idx);
2264 if(!lov_stripe_intersects(lsm, i, start_off, end_off,
2265 &obd_start, &obd_end))
2267 obd_start >>= PAGE_CACHE_SHIFT;
2268 obd_end >>= PAGE_CACHE_SHIFT;
2270 CDEBUG(D_INODE, "offs [%lu,%lu] => offs [%lu,%lu] stripe %d\n",
2271 start, end, (unsigned long)obd_start,
2272 (unsigned long)obd_end, loi->loi_ost_idx);
2273 submd->lsm_oinfo[0].loi_dirty_ot = &loi->loi_dirty_ot_inline;
2274 rc = obd_clear_dirty_pages(&lov->tgts[loi->loi_ost_idx].conn,
2275 submd, obd_start, obd_end,
2279 *cleared += osc_cleared;
2283 OBD_FREE(submd, lov_stripe_md_size(1));
2284 class_export_put(export);
2288 static int lov_last_dirty_offset(struct lustre_handle *conn,
2289 struct lov_stripe_md *lsm,
2290 unsigned long *offset)
2292 struct obd_export *export = class_conn2export(conn);
2293 struct lov_stripe_md *submd = NULL;
2294 struct lov_obd *lov;
2295 struct lov_oinfo *loi;
2296 unsigned long tmp, count, skip;
2300 if (lsm_bad_magic(lsm))
2301 GOTO(out_exp, rc = -EINVAL);
2303 if (!export || !export->exp_obd)
2304 GOTO(out_exp, rc = -ENODEV);
2306 OBD_ALLOC(submd, lov_stripe_md_size(1));
2308 GOTO(out_exp, rc = -ENOMEM);
2311 lov = &export->exp_obd->u.lov;
2313 for (i = 0, loi = lsm->lsm_oinfo; i < lsm->lsm_stripe_count;
2316 count = lsm->lsm_stripe_size >> PAGE_CACHE_SHIFT;
2317 skip = (lsm->lsm_stripe_count - 1) * count;
2319 submd->lsm_oinfo[0].loi_dirty_ot = &loi->loi_dirty_ot_inline;
2321 err = obd_last_dirty_offset(&lov->tgts[loi->loi_ost_idx].conn,
2326 GOTO(out_exp, rc = err);
2330 tmp += (tmp/count * skip) + (i * count);
2336 OBD_FREE(submd, lov_stripe_md_size(1));
2337 class_export_put(export);
2341 struct obd_ops lov_obd_ops = {
2342 o_owner: THIS_MODULE,
2343 o_attach: lov_attach,
2344 o_detach: lov_detach,
2346 o_connect: lov_connect,
2347 o_disconnect: lov_disconnect,
2348 o_statfs: lov_statfs,
2349 o_packmd: lov_packmd,
2350 o_unpackmd: lov_unpackmd,
2351 o_create: lov_create,
2352 o_destroy: lov_destroy,
2353 o_getattr: lov_getattr,
2354 o_getattr_async: lov_getattr_async,
2355 o_setattr: lov_setattr,
2359 o_brw_async: lov_brw_async,
2361 o_enqueue: lov_enqueue,
2363 o_cancel: lov_cancel,
2364 o_cancel_unused: lov_cancel_unused,
2365 o_iocontrol: lov_iocontrol,
2366 o_get_info: lov_get_info,
2367 .o_mark_page_dirty = lov_mark_page_dirty,
2368 .o_clear_dirty_pages = lov_clear_dirty_pages,
2369 .o_last_dirty_offset = lov_last_dirty_offset,
2372 int __init lov_init(void)
2374 struct lprocfs_static_vars lvars;
2377 printk(KERN_INFO "Lustre Logical Object Volume driver; "
2378 "info@clusterfs.com\n");
2379 lprocfs_init_vars(&lvars);
2380 rc = class_register_type(&lov_obd_ops, lvars.module_vars,
2381 OBD_LOV_DEVICENAME);
2385 static void __exit lov_exit(void)
2387 class_unregister_type(OBD_LOV_DEVICENAME);
2391 MODULE_AUTHOR("Cluster File Systems, Inc. <info@clusterfs.com>");
2392 MODULE_DESCRIPTION("Lustre Logical Object Volume OBD driver");
2393 MODULE_LICENSE("GPL");
2395 module_init(lov_init);
2396 module_exit(lov_exit);