1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2 * vim:expandtab:shiftwidth=8:tabstop=8:
6 * Copyright (C) 2002 Cluster File Systems, Inc.
7 * Author: Phil Schwan <phil@off.net>
8 * Peter Braam <braam@clusterfs.com>
9 * Mike Shaver <shaver@off.net>
11 * This code is issued under the GNU General Public License.
12 * See the file COPYING in this distribution
16 #define DEBUG_SUBSYSTEM S_LOV
18 #include <linux/slab.h>
19 #include <linux/module.h>
20 #include <linux/obd_support.h>
21 #include <linux/lustre_lib.h>
22 #include <linux/lustre_net.h>
23 #include <linux/lustre_idl.h>
24 #include <linux/lustre_lite.h> /* for LL_IOC_LOV_[GS]ETSTRIPE */
25 #include <linux/lustre_mds.h>
26 #include <linux/obd_class.h>
27 #include <linux/obd_lov.h>
28 #include <linux/init.h>
29 #include <linux/random.h>
30 #include <linux/slab.h>
31 #include <asm/div64.h>
32 #include <linux/lprocfs_status.h>
34 extern struct lprocfs_vars status_var_nm_1[];
35 extern struct lprocfs_vars status_class_var[];
37 static kmem_cache_t *lov_file_cache;
39 struct lov_file_handles {
40 struct list_head lfh_list;
43 struct lustre_handle *lfh_handles;
46 struct lov_lock_handles {
48 struct lustre_handle llh_handles[0];
51 extern int lov_packmd(struct lustre_handle *conn, struct lov_mds_md **lmm,
52 struct lov_stripe_md *lsm);
53 extern int lov_unpackmd(struct lustre_handle *conn, struct lov_stripe_md **lsm,
54 struct lov_mds_md *lmm);
55 extern int lov_setstripe(struct lustre_handle *conn,
56 struct lov_stripe_md **lsmp, struct lov_mds_md *lmmu);
57 extern int lov_getstripe(struct lustre_handle *conn, struct lov_mds_md *lmmu,
58 struct lov_stripe_md *lsm);
61 int lov_attach(struct obd_device *dev, obd_count len, void *data)
63 return lprocfs_reg_obd(dev, status_var_nm_1, dev);
66 int lov_detach(struct obd_device *dev)
68 return lprocfs_dereg_obd(dev);
71 static int lov_connect(struct lustre_handle *conn, struct obd_device *obd,
72 obd_uuid_t cluuid, struct recovd_obd *recovd,
73 ptlrpc_recovery_cb_t recover)
75 struct ptlrpc_request *req = NULL;
76 struct lov_obd *lov = &obd->u.lov;
77 struct client_obd *mdc = &lov->mdcobd->u.cli;
78 struct lov_desc *desc = &lov->desc;
79 struct obd_export *exp;
80 struct lustre_handle mdc_conn;
81 obd_uuid_t *uuidarray;
85 rc = class_connect(conn, obd, cluuid);
89 /* We don't want to actually do the underlying connections more than
90 * once, so keep track. */
92 if (lov->refcount > 1)
95 exp = class_conn2export(conn);
96 spin_lock_init(&exp->exp_lov_data.led_lock);
97 INIT_LIST_HEAD(&exp->exp_lov_data.led_open_head);
99 /* retrieve LOV metadata from MDS */
100 rc = obd_connect(&mdc_conn, lov->mdcobd, NULL, recovd, recover);
102 CERROR("cannot connect to mdc: rc = %d\n", rc);
106 rc = mdc_getlovinfo(obd, &mdc_conn, &req);
107 rc2 = obd_disconnect(&mdc_conn);
109 CERROR("cannot get lov info %d\n", rc);
114 CERROR("error disconnecting from MDS %d\n", rc2);
115 GOTO(out_conn, rc = rc2);
119 if (req->rq_repmsg->bufcount < 2 ||
120 req->rq_repmsg->buflens[0] < sizeof(*desc)) {
121 CERROR("LOV desc: invalid descriptor returned\n");
122 GOTO(out_conn, rc = -EINVAL);
125 memcpy(desc, lustre_msg_buf(req->rq_repmsg, 0), sizeof(*desc));
126 lov_unpackdesc(desc);
128 if (req->rq_repmsg->buflens[1] < sizeof(*uuidarray)*desc->ld_tgt_count){
129 CERROR("LOV desc: invalid uuid array returned\n");
130 GOTO(out_conn, rc = -EINVAL);
133 if (memcmp(obd->obd_uuid, desc->ld_uuid, sizeof(desc->ld_uuid))) {
134 CERROR("LOV desc: uuid %s not on mds device (%s)\n",
135 obd->obd_uuid, desc->ld_uuid);
136 GOTO(out_conn, rc = -EINVAL);
139 if (desc->ld_tgt_count > 1000) {
140 CERROR("LOV desc: target count > 1000 (%d)\n",
142 GOTO(out_conn, rc = -EINVAL);
145 /* Because of 64-bit divide/mod operations only work with a 32-bit
146 * divisor in a 32-bit kernel, we cannot support a stripe width
147 * of 4GB or larger on 32-bit CPUs.
149 if ((desc->ld_default_stripe_count ?
150 desc->ld_default_stripe_count : desc->ld_tgt_count) *
151 desc->ld_default_stripe_size > ~0UL) {
152 CERROR("LOV: stripe width "LPU64"x%u > %lu on 32-bit system\n",
153 desc->ld_default_stripe_size,
154 desc->ld_default_stripe_count ?
155 desc->ld_default_stripe_count : desc->ld_tgt_count,~0UL);
156 GOTO(out_conn, rc = -EINVAL);
159 lov->bufsize = sizeof(struct lov_tgt_desc) * desc->ld_tgt_count;
160 OBD_ALLOC(lov->tgts, lov->bufsize);
162 CERROR("Out of memory\n");
163 GOTO(out_conn, rc = -ENOMEM);
166 uuidarray = lustre_msg_buf(req->rq_repmsg, 1);
167 for (i = 0; i < desc->ld_tgt_count; i++)
168 memcpy(lov->tgts[i].uuid, uuidarray[i], sizeof(*uuidarray));
170 for (i = 0; i < desc->ld_tgt_count; i++) {
171 struct obd_device *tgt = client_tgtuuid2obd(uuidarray[i]);
174 CERROR("Target %s not attached\n", uuidarray[i]);
175 GOTO(out_disc, rc = -EINVAL);
178 if (!(tgt->obd_flags & OBD_SET_UP)) {
179 CERROR("Target %s not set up\n", uuidarray[i]);
180 GOTO(out_disc, rc = -EINVAL);
183 rc = obd_connect(&lov->tgts[i].conn, tgt, NULL, recovd,
187 CERROR("Target %s connect error %d\n", uuidarray[i],
192 rc = obd_iocontrol(IOC_OSC_REGISTER_LOV, &lov->tgts[i].conn,
193 sizeof(struct obd_device *), obd, NULL);
195 CERROR("Target %s REGISTER_LOV error %d\n",
200 desc->ld_active_tgt_count++;
201 lov->tgts[i].active = 1;
204 mdc->cl_max_mds_easize = obd_size_wiremd(conn, NULL);
207 ptlrpc_req_finished(req);
211 i--; /* skip failed-connect OSC */
213 desc->ld_active_tgt_count--;
214 lov->tgts[i].active = 0;
215 rc2 = obd_disconnect(&lov->tgts[i].conn);
217 CERROR("LOV Target %s disconnect error: rc = %d\n",
220 OBD_FREE(lov->tgts, lov->bufsize);
222 class_disconnect(conn);
226 static int lov_disconnect(struct lustre_handle *conn)
228 struct obd_device *obd = class_conn2obd(conn);
229 struct lov_obd *lov = &obd->u.lov;
230 struct obd_export *exp;
231 struct list_head *p, *n;
237 /* Only disconnect the underlying layers on the final disconnect. */
239 if (lov->refcount != 0)
242 for (i = 0; i < lov->desc.ld_tgt_count; i++) {
243 rc = obd_disconnect(&lov->tgts[i].conn);
245 if (lov->tgts[i].active) {
246 CERROR("Target %s disconnect error %d\n",
247 lov->tgts[i].uuid, rc);
251 if (lov->tgts[i].active) {
252 lov->desc.ld_active_tgt_count--;
253 lov->tgts[i].active = 0;
256 OBD_FREE(lov->tgts, lov->bufsize);
260 exp = class_conn2export(conn);
261 spin_lock(&exp->exp_lov_data.led_lock);
262 list_for_each_safe(p, n, &exp->exp_lov_data.led_open_head) {
263 /* XXX close these, instead of just discarding them? */
264 struct lov_file_handles *lfh;
265 lfh = list_entry(p, typeof(*lfh), lfh_list);
266 CERROR("discarding open LOV handle %p:"LPX64"\n",
267 lfh, lfh->lfh_cookie);
268 list_del(&lfh->lfh_list);
269 OBD_FREE(lfh->lfh_handles,
270 lfh->lfh_count * sizeof(*lfh->lfh_handles));
271 kmem_cache_free(lov_file_cache, lfh);
273 spin_unlock(&exp->exp_lov_data.led_lock);
276 rc = class_disconnect(conn);
282 * -EINVAL : UUID can't be found in the LOV's target list
283 * -ENOTCONN: The UUID is found, but the target connection is bad (!)
284 * -EBADF : The UUID is found, but the OBD is the wrong type (!)
285 * -EALREADY: The OSC is already marked (in)active
287 static int lov_set_osc_active(struct lov_obd *lov, obd_uuid_t uuid,
290 struct obd_device *obd;
291 struct lov_tgt_desc *tgt;
295 CDEBUG(D_INFO, "Searching in lov %p for uuid %s (activate=%d)\n",
296 lov, uuid, activate);
298 spin_lock(&lov->lov_lock);
299 for (i = 0, tgt = lov->tgts; i < lov->desc.ld_tgt_count; i++, tgt++) {
300 CDEBUG(D_INFO, "lov idx %d is %s conn "LPX64"\n",
301 i, tgt->uuid, tgt->conn.addr);
302 if (strncmp(uuid, tgt->uuid, sizeof(tgt->uuid)) == 0)
306 if (i == lov->desc.ld_tgt_count)
307 GOTO(out, rc = -EINVAL);
309 obd = class_conn2obd(&tgt->conn);
312 GOTO(out, rc = -ENOTCONN);
315 CDEBUG(D_INFO, "Found OBD %s=%s device %d (%p) type %s at LOV idx %d\n",
316 obd->obd_name, obd->obd_uuid, obd->obd_minor, obd,
317 obd->obd_type->typ_name, i);
318 if (strcmp(obd->obd_type->typ_name, "osc") != 0) {
320 GOTO(out, rc = -EBADF);
323 if (tgt->active == activate) {
324 CDEBUG(D_INFO, "OBD %p already %sactive!\n", obd,
325 activate ? "" : "in");
326 GOTO(out, rc = -EALREADY);
329 CDEBUG(D_INFO, "Marking OBD %p %sactive\n", obd, activate ? "" : "in");
331 tgt->active = activate;
336 * if (file_handle uses this_osc)
337 * if (has_no_filehandle)
338 * open(file_handle, this_osc);
341 lov->desc.ld_active_tgt_count++;
344 * Should I invalidate filehandles that refer to this OSC, so
345 * that I reopen them during reactivation?
347 /* XXX disconnect from OSC? */
348 lov->desc.ld_active_tgt_count--;
351 #warning "FIXME: walk open files list for objects that need opening"
354 spin_unlock(&lov->lov_lock);
358 static int lov_setup(struct obd_device *obd, obd_count len, void *buf)
360 struct obd_ioctl_data *data = buf;
361 struct lov_obd *lov = &obd->u.lov;
365 if (data->ioc_inllen1 < 1) {
366 CERROR("LOV setup requires an MDC UUID\n");
370 if (data->ioc_inllen1 > 37) {
371 CERROR("mdc UUID must be 36 characters or less\n");
375 spin_lock_init(&lov->lov_lock);
376 lov->mdcobd = class_uuid2obd(data->ioc_inlbuf1);
378 CERROR("LOV %s cannot locate MDC %s\n", obd->obd_uuid,
385 static struct lov_file_handles *lov_handle2lfh(struct lustre_handle *handle)
387 struct lov_file_handles *lfh = NULL;
389 if (!handle || !handle->addr)
392 lfh = (struct lov_file_handles *)(unsigned long)(handle->addr);
393 if (!kmem_cache_validate(lov_file_cache, lfh))
396 if (lfh->lfh_cookie != handle->cookie)
402 /* the LOV expects oa->o_id to be set to the LOV object id */
403 static int lov_create(struct lustre_handle *conn, struct obdo *oa,
404 struct lov_stripe_md **ea)
406 struct obd_export *export = class_conn2export(conn);
408 struct lov_stripe_md *lsm;
409 struct lov_oinfo *loi;
411 int ost_count, ost_idx;
412 int first = 1, obj_alloc = 0;
421 lov = &export->exp_obd->u.lov;
423 if (!lov->desc.ld_active_tgt_count)
433 rc = obd_alloc_memmd(conn, &lsm);
438 lsm->lsm_magic = LOV_MAGIC;
441 ost_count = lov->desc.ld_tgt_count;
443 LASSERT(oa->o_valid & OBD_MD_FLID);
444 lsm->lsm_object_id = oa->o_id;
445 if (!lsm->lsm_stripe_size)
446 lsm->lsm_stripe_size = lov->desc.ld_default_stripe_size;
448 if (!*ea || lsm->lsm_stripe_offset >= ost_count) {
449 int mult = lsm->lsm_object_id * lsm->lsm_stripe_count;
450 int stripe_offset = mult % ost_count;
451 int sub_offset = (mult / ost_count) % lsm->lsm_stripe_count;
453 ost_idx = stripe_offset + sub_offset;
455 ost_idx = lsm->lsm_stripe_offset;
457 CDEBUG(D_INODE, "allocating %d subobjs for objid "LPX64" at idx %d\n",
458 lsm->lsm_stripe_count, lsm->lsm_object_id, ost_idx);
460 loi = lsm->lsm_oinfo;
461 for (i = 0; i < ost_count; i++, ost_idx = (ost_idx + 1) % ost_count) {
462 struct lov_stripe_md obj_md;
463 struct lov_stripe_md *obj_mdp = &obj_md;
466 if (lov->tgts[ost_idx].active == 0) {
467 CDEBUG(D_HA, "lov idx %d inactive\n", ost_idx);
471 /* create data objects with "parent" OA */
472 memcpy(tmp, oa, sizeof(*tmp));
473 /* XXX: LOV STACKING: use real "obj_mdp" sub-data */
474 err = obd_create(&lov->tgts[ost_idx].conn, tmp, &obj_mdp);
476 if (lov->tgts[ost_idx].active) {
477 CERROR("error creating objid "LPX64" sub-object"
478 "on OST idx %d: rc = %d\n",
479 oa->o_id, ost_idx, err);
485 loi->loi_id = tmp->o_id;
486 loi->loi_ost_idx = ost_idx;
487 CDEBUG(D_INODE, "objid "LPX64" has subobj "LPX64" at idx %d\n",
488 lsm->lsm_object_id, loi->loi_id, ost_idx);
491 lsm->lsm_stripe_offset = ost_idx;
498 /* If we have allocated enough objects, we are OK */
499 if (obj_alloc == lsm->lsm_stripe_count) {
506 GOTO(out_cleanup, rc);
508 struct lov_stripe_md *lsm_new;
509 /* XXX LOV STACKING call into osc for sizes */
510 int size = lov_stripe_md_size(obj_alloc);
512 OBD_ALLOC(lsm_new, size);
514 GOTO(out_cleanup, rc = -ENOMEM);
515 memcpy(lsm_new, lsm, size);
516 /* XXX LOV STACKING call into osc for sizes */
517 OBD_FREE(lsm, lov_stripe_md_size(lsm->lsm_stripe_count));
532 /* destroy already created objects here */
533 memcpy(tmp, oa, sizeof(*tmp));
534 tmp->o_id = loi->loi_id;
535 err = obd_destroy(&lov->tgts[loi->loi_ost_idx].conn, tmp, NULL);
537 CERROR("Failed to uncreate objid "LPX64" subobj "
538 LPX64" on OST idx %d: rc = %d\n",
539 oa->o_id, loi->loi_id, loi->loi_ost_idx,
543 obd_free_memmd(conn, &lsm);
547 static int lov_destroy(struct lustre_handle *conn, struct obdo *oa,
548 struct lov_stripe_md *lsm)
551 struct obd_export *export = class_conn2export(conn);
553 struct lov_oinfo *loi;
554 struct lov_file_handles *lfh = NULL;
559 CERROR("LOV requires striping ea for destruction\n");
563 if (lsm->lsm_magic != LOV_MAGIC) {
564 CERROR("LOV striping magic bad %#x != %#x\n",
565 lsm->lsm_magic, LOV_MAGIC);
569 if (!export || !export->exp_obd)
572 if (oa->o_valid & OBD_MD_FLHANDLE)
573 lfh = lov_handle2lfh(obdo_handle(oa));
575 lov = &export->exp_obd->u.lov;
576 for (i = 0,loi = lsm->lsm_oinfo; i < lsm->lsm_stripe_count; i++,loi++) {
578 if (lov->tgts[loi->loi_ost_idx].active == 0) {
579 CDEBUG(D_HA, "lov idx %d inactive\n", loi->loi_ost_idx);
580 /* Orphan clean up will (someday) fix this up. */
584 memcpy(&tmp, oa, sizeof(tmp));
585 tmp.o_id = loi->loi_id;
587 memcpy(obdo_handle(&tmp), &lfh->lfh_handles[i],
588 sizeof(lfh->lfh_handles[i]));
590 tmp.o_valid &= ~OBD_MD_FLHANDLE;
591 err = obd_destroy(&lov->tgts[loi->loi_ost_idx].conn, &tmp,
593 if (err && lov->tgts[loi->loi_ost_idx].active) {
594 CERROR("Error destroying objid "LPX64" subobj "
595 LPX64" on OST idx %d\n: rc = %d",
596 oa->o_id, loi->loi_id, loi->loi_ost_idx, err);
604 /* compute object size given "stripeno" and the ost size */
605 static obd_size lov_stripe_size(struct lov_stripe_md *lsm, obd_size ost_size,
608 unsigned long ssize = lsm->lsm_stripe_size;
609 unsigned long swidth = ssize * lsm->lsm_stripe_count;
610 unsigned long stripe_size;
616 /* do_div(a, b) returns a % b, and a = a / b */
617 stripe_size = do_div(ost_size, ssize);
620 lov_size = ost_size * swidth + stripeno * ssize + stripe_size;
622 lov_size = (ost_size - 1) * swidth + (stripeno + 1) * ssize;
627 static void lov_merge_attrs(struct obdo *tgt, struct obdo *src, obd_flag valid,
628 struct lov_stripe_md *lsm, int stripeno, int *new)
631 obdo_cpy_md(tgt, src, valid);
632 if (valid & OBD_MD_FLSIZE)
633 tgt->o_size = lov_stripe_size(lsm,src->o_size,stripeno);
636 if (valid & OBD_MD_FLSIZE) {
637 /* this handles sparse files properly */
640 lov_size = lov_stripe_size(lsm, src->o_size, stripeno);
641 if (lov_size > tgt->o_size)
642 tgt->o_size = lov_size;
644 if (valid & OBD_MD_FLBLOCKS)
645 tgt->o_blocks += src->o_blocks;
646 if (valid & OBD_MD_FLCTIME && tgt->o_ctime < src->o_ctime)
647 tgt->o_ctime = src->o_ctime;
648 if (valid & OBD_MD_FLMTIME && tgt->o_mtime < src->o_mtime)
649 tgt->o_mtime = src->o_mtime;
653 static int lov_getattr(struct lustre_handle *conn, struct obdo *oa,
654 struct lov_stripe_md *lsm)
657 struct obd_export *export = class_conn2export(conn);
659 struct lov_oinfo *loi;
660 struct lov_file_handles *lfh = NULL;
666 CERROR("LOV requires striping ea\n");
670 if (lsm->lsm_magic != LOV_MAGIC) {
671 CERROR("LOV striping magic bad %#x != %#x\n",
672 lsm->lsm_magic, LOV_MAGIC);
676 if (!export || !export->exp_obd)
679 lov = &export->exp_obd->u.lov;
681 if (oa->o_valid & OBD_MD_FLHANDLE)
682 lfh = lov_handle2lfh(obdo_handle(oa));
684 CDEBUG(D_INFO, "objid "LPX64": %ux%u byte stripes\n",
685 lsm->lsm_object_id, lsm->lsm_stripe_count, lsm->lsm_stripe_size);
686 for (i = 0,loi = lsm->lsm_oinfo; i < lsm->lsm_stripe_count; i++,loi++) {
689 if (lov->tgts[loi->loi_ost_idx].active == 0) {
690 CDEBUG(D_HA, "lov idx %d inactive\n", loi->loi_ost_idx);
694 CDEBUG(D_INFO, "objid "LPX64"[%d] has subobj "LPX64" at idx "
695 "%u\n", oa->o_id, i, loi->loi_id, loi->loi_ost_idx);
696 /* create data objects with "parent" OA */
697 memcpy(&tmp, oa, sizeof(tmp));
698 tmp.o_id = loi->loi_id;
700 memcpy(obdo_handle(&tmp), &lfh->lfh_handles[i],
701 sizeof(lfh->lfh_handles[i]));
703 tmp.o_valid &= ~OBD_MD_FLHANDLE;
705 err = obd_getattr(&lov->tgts[loi->loi_ost_idx].conn, &tmp,NULL);
707 if (lov->tgts[loi->loi_ost_idx].active) {
708 CERROR("Error getattr objid "LPX64" subobj "
709 LPX64" on OST idx %d: rc = %d\n",
710 oa->o_id, loi->loi_id, loi->loi_ost_idx,
715 lov_merge_attrs(oa, &tmp, tmp.o_valid, lsm, i, &new);
722 static int lov_setattr(struct lustre_handle *conn, struct obdo *oa,
723 struct lov_stripe_md *lsm)
726 struct obd_export *export = class_conn2export(conn);
728 struct lov_oinfo *loi;
729 struct lov_file_handles *lfh = NULL;
733 /* Note that this code is currently unused, hence LBUG(), just
734 * to know when/if it is ever revived that it needs cleanups.
739 CERROR("LOV requires striping ea\n");
743 if (lsm->lsm_magic != LOV_MAGIC) {
744 CERROR("LOV striping magic bad %#x != %#x\n",
745 lsm->lsm_magic, LOV_MAGIC);
749 if (!export || !export->exp_obd)
752 /* size changes should go through punch and not setattr */
753 LASSERT(!(oa->o_valid & OBD_MD_FLSIZE));
759 if (oa->o_valid & OBD_MD_FLHANDLE)
760 lfh = lov_handle2lfh(obdo_handle(oa));
762 lov = &export->exp_obd->u.lov;
763 for (i = 0,loi = lsm->lsm_oinfo; i < lsm->lsm_stripe_count; i++,loi++) {
766 obdo_cpy_md(tmp, oa, oa->o_valid);
769 memcpy(obdo_handle(tmp), &lfh->lfh_handles[i],
770 sizeof(lfh->lfh_handles[i]));
772 tmp->o_valid &= ~OBD_MD_FLHANDLE;
774 tmp->o_id = loi->loi_id;
776 err = obd_setattr(&lov->tgts[loi->loi_ost_idx].conn, tmp, NULL);
778 CERROR("Error setattr objid "LPX64" subobj "LPX64
779 " on OST idx %d: rc = %d\n",
780 oa->o_id, loi->loi_id, loi->loi_ost_idx, err);
789 static int lov_open(struct lustre_handle *conn, struct obdo *oa,
790 struct lov_stripe_md *lsm)
792 struct obdo *tmp; /* on the heap here, on the stack in lov_close? */
793 struct obd_export *export = class_conn2export(conn);
795 struct lov_oinfo *loi;
796 struct lov_file_handles *lfh = NULL;
797 struct lustre_handle *handle;
803 CERROR("LOV requires striping ea for opening\n");
807 if (lsm->lsm_magic != LOV_MAGIC) {
808 CERROR("LOV striping magic bad %#x != %#x\n",
809 lsm->lsm_magic, LOV_MAGIC);
813 if (!export || !export->exp_obd)
820 lfh = kmem_cache_alloc(lov_file_cache, GFP_KERNEL);
822 GOTO(out_tmp, rc = -ENOMEM);
823 OBD_ALLOC(lfh->lfh_handles,
824 lsm->lsm_stripe_count * sizeof(*lfh->lfh_handles));
825 if (!lfh->lfh_handles)
826 GOTO(out_lfh, rc = -ENOMEM);
828 lov = &export->exp_obd->u.lov;
831 for (i = 0,loi = lsm->lsm_oinfo; i < lsm->lsm_stripe_count; i++,loi++) {
833 if (lov->tgts[loi->loi_ost_idx].active == 0) {
834 CDEBUG(D_HA, "lov idx %d inactive\n", loi->loi_ost_idx);
838 /* create data objects with "parent" OA */
839 memcpy(tmp, oa, sizeof(*tmp));
840 tmp->o_id = loi->loi_id;
842 rc = obd_open(&lov->tgts[loi->loi_ost_idx].conn, tmp, NULL);
844 if (lov->tgts[loi->loi_ost_idx].active) {
845 CERROR("Error open objid "LPX64" subobj "LPX64
846 " on OST idx %d: rc = %d\n",
847 oa->o_id, lsm->lsm_oinfo[i].loi_id,
848 loi->loi_ost_idx, rc);
854 lov_merge_attrs(oa, tmp, tmp->o_valid, lsm, i, &new);
856 if (tmp->o_valid & OBD_MD_FLHANDLE)
857 memcpy(&lfh->lfh_handles[i], obdo_handle(tmp),
858 sizeof(lfh->lfh_handles[i]));
861 handle = obdo_handle(oa);
863 lfh->lfh_count = lsm->lsm_stripe_count;
864 get_random_bytes(&lfh->lfh_cookie, sizeof(lfh->lfh_cookie));
866 handle->addr = (__u64)(unsigned long)lfh;
867 handle->cookie = lfh->lfh_cookie;
868 oa->o_valid |= OBD_MD_FLHANDLE;
869 spin_lock(&export->exp_lov_data.led_lock);
870 list_add(&lfh->lfh_list, &export->exp_lov_data.led_open_head);
871 spin_unlock(&export->exp_lov_data.led_lock);
878 for (i--, loi = &lsm->lsm_oinfo[i]; i >= 0; i--, loi--) {
881 if (lov->tgts[loi->loi_ost_idx].active == 0)
884 memcpy(tmp, oa, sizeof(*tmp));
885 tmp->o_id = loi->loi_id;
886 memcpy(obdo_handle(tmp), &lfh->lfh_handles[i],
887 sizeof(lfh->lfh_handles[i]));
889 err = obd_close(&lov->tgts[loi->loi_ost_idx].conn, tmp, NULL);
891 CERROR("Error closing objid "LPX64" subobj "LPX64
892 " on OST idx %d after open error: rc = %d\n",
893 oa->o_id, loi->loi_id, loi->loi_ost_idx, err);
897 OBD_FREE(lfh->lfh_handles,
898 lsm->lsm_stripe_count * sizeof(*lfh->lfh_handles));
900 lfh->lfh_cookie = DEAD_HANDLE_MAGIC;
901 kmem_cache_free(lov_file_cache, lfh);
905 static int lov_close(struct lustre_handle *conn, struct obdo *oa,
906 struct lov_stripe_md *lsm)
909 struct obd_export *export = class_conn2export(conn);
911 struct lov_oinfo *loi;
912 struct lov_file_handles *lfh = NULL;
917 CERROR("LOV requires striping ea\n");
921 if (lsm->lsm_magic != LOV_MAGIC) {
922 CERROR("LOV striping magic bad %#x != %#x\n",
923 lsm->lsm_magic, LOV_MAGIC);
927 if (!export || !export->exp_obd)
930 if (oa->o_valid & OBD_MD_FLHANDLE)
931 lfh = lov_handle2lfh(obdo_handle(oa));
933 lov = &export->exp_obd->u.lov;
934 for (i = 0,loi = lsm->lsm_oinfo; i < lsm->lsm_stripe_count; i++,loi++) {
937 if (lov->tgts[loi->loi_ost_idx].active == 0) {
938 CDEBUG(D_HA, "lov idx %d inactive\n", loi->loi_ost_idx);
942 /* create data objects with "parent" OA */
943 memcpy(&tmp, oa, sizeof(tmp));
944 tmp.o_id = loi->loi_id;
946 memcpy(obdo_handle(&tmp), &lfh->lfh_handles[i],
947 sizeof(lfh->lfh_handles[i]));
949 tmp.o_valid &= ~OBD_MD_FLHANDLE;
951 err = obd_close(&lov->tgts[loi->loi_ost_idx].conn, &tmp, NULL);
953 CERROR("Error close objid "LPX64" subobj "LPX64
954 " on OST idx %d: rc = %d\n",
955 oa->o_id, loi->loi_id, loi->loi_ost_idx, err);
961 list_del(&lfh->lfh_list);
962 OBD_FREE(lfh->lfh_handles,
963 lsm->lsm_stripe_count * sizeof(*lfh->lfh_handles));
964 lfh->lfh_cookie = DEAD_HANDLE_MAGIC;
965 kmem_cache_free(lov_file_cache, lfh);
972 #define log2(n) ffz(~(n))
975 #warning FIXME: merge these two functions now that they are nearly the same
977 /* compute ost offset in stripe "stripeno" corresponding to offset "lov_off" */
978 static obd_off lov_stripe_offset(struct lov_stripe_md *lsm, obd_off lov_off,
981 unsigned long ssize = lsm->lsm_stripe_size;
982 unsigned long swidth = ssize * lsm->lsm_stripe_count;
983 unsigned long stripe_off, this_stripe;
985 if (lov_off == OBD_OBJECT_EOF || lov_off == 0)
988 /* do_div(a, b) returns a % b, and a = a / b */
989 stripe_off = do_div(lov_off, swidth);
991 this_stripe = stripeno * ssize;
992 if (stripe_off <= this_stripe)
995 stripe_off -= this_stripe;
997 if (stripe_off > ssize)
1002 return lov_off * ssize + stripe_off;
1005 /* compute which stripe number "lov_off" will be written into */
1006 static int lov_stripe_number(struct lov_stripe_md *lsm, obd_off lov_off)
1008 unsigned long ssize = lsm->lsm_stripe_size;
1009 unsigned long swidth = ssize * lsm->lsm_stripe_count;
1010 unsigned long stripe_off;
1012 stripe_off = do_div(lov_off, swidth);
1014 return stripe_off / ssize;
1018 /* FIXME: maybe we'll just make one node the authoritative attribute node, then
1019 * we can send this 'punch' to just the authoritative node and the nodes
1020 * that the punch will affect. */
1021 static int lov_punch(struct lustre_handle *conn, struct obdo *oa,
1022 struct lov_stripe_md *lsm,
1023 obd_off start, obd_off end)
1026 struct obd_export *export = class_conn2export(conn);
1027 struct lov_obd *lov;
1028 struct lov_oinfo *loi;
1029 struct lov_file_handles *lfh = NULL;
1034 CERROR("LOV requires striping ea\n");
1038 if (lsm->lsm_magic != LOV_MAGIC) {
1039 CERROR("LOV striping magic bad %#x != %#x\n",
1040 lsm->lsm_magic, LOV_MAGIC);
1044 if (!export || !export->exp_obd)
1047 if (oa->o_valid & OBD_MD_FLHANDLE)
1048 lfh = lov_handle2lfh(obdo_handle(oa));
1050 lov = &export->exp_obd->u.lov;
1051 for (i = 0,loi = lsm->lsm_oinfo; i < lsm->lsm_stripe_count; i++,loi++) {
1052 obd_off starti = lov_stripe_offset(lsm, start, i);
1053 obd_off endi = lov_stripe_offset(lsm, end, i);
1059 /* create data objects with "parent" OA */
1060 memcpy(&tmp, oa, sizeof(tmp));
1061 tmp.o_id = loi->loi_id;
1063 memcpy(obdo_handle(&tmp), &lfh->lfh_handles[i],
1064 sizeof(lfh->lfh_handles[i]));
1066 tmp.o_valid &= ~OBD_MD_FLHANDLE;
1068 err = obd_punch(&lov->tgts[loi->loi_ost_idx].conn, &tmp, NULL,
1071 CERROR("Error punch objid "LPX64" subobj "LPX64
1072 " on OST idx %d: rc = %d\n",
1073 oa->o_id, loi->loi_id, loi->loi_ost_idx, err);
1081 static inline int lov_brw(int cmd, struct lustre_handle *conn,
1082 struct lov_stripe_md *lsm, obd_count oa_bufs,
1083 struct brw_page *pga, struct obd_brw_set *set)
1089 struct lov_stripe_md lsm;
1091 } *stripeinfo, *si, *si_last;
1092 struct obd_export *export = class_conn2export(conn);
1093 struct lov_obd *lov;
1094 struct brw_page *ioarr;
1095 struct lov_oinfo *loi;
1096 int rc = 0, i, *where, stripe_count = lsm->lsm_stripe_count;
1100 CERROR("LOV requires striping ea\n");
1104 if (lsm->lsm_magic != LOV_MAGIC) {
1105 CERROR("LOV striping magic bad %#x != %#x\n",
1106 lsm->lsm_magic, LOV_MAGIC);
1110 lov = &export->exp_obd->u.lov;
1112 OBD_ALLOC(stripeinfo, stripe_count * sizeof(*stripeinfo));
1114 GOTO(out_cbdata, rc = -ENOMEM);
1116 OBD_ALLOC(where, sizeof(*where) * oa_bufs);
1118 GOTO(out_sinfo, rc = -ENOMEM);
1120 OBD_ALLOC(ioarr, sizeof(*ioarr) * oa_bufs);
1122 GOTO(out_where, rc = -ENOMEM);
1124 for (i = 0; i < oa_bufs; i++) {
1125 where[i] = lov_stripe_number(lsm, pga[i].off);
1126 stripeinfo[where[i]].bufct++;
1129 for (i = 0, loi = lsm->lsm_oinfo, si_last = si = stripeinfo;
1130 i < stripe_count; i++, loi++, si_last = si, si++) {
1132 si->index = si_last->index + si_last->bufct;
1133 si->lsm.lsm_object_id = loi->loi_id;
1134 si->ost_idx = loi->loi_ost_idx;
1137 for (i = 0; i < oa_bufs; i++) {
1138 int which = where[i];
1141 shift = stripeinfo[which].index + stripeinfo[which].subcount;
1142 LASSERT(shift < oa_bufs);
1143 ioarr[shift] = pga[i];
1144 ioarr[shift].off = lov_stripe_offset(lsm, pga[i].off, which);
1145 stripeinfo[which].subcount++;
1148 for (i = 0, si = stripeinfo; i < stripe_count; i++, si++) {
1149 int shift = si->index;
1152 LASSERT(shift < oa_bufs);
1153 rc = obd_brw(cmd, &lov->tgts[si->ost_idx].conn,
1154 &si->lsm, si->bufct, &ioarr[shift], set);
1156 GOTO(out_ioarr, rc);
1161 OBD_FREE(ioarr, sizeof(*ioarr) * oa_bufs);
1163 OBD_FREE(where, sizeof(*where) * oa_bufs);
1165 OBD_FREE(stripeinfo, stripe_count * sizeof(*stripeinfo));
1170 static struct lov_lock_handles *lov_newlockh(struct lov_stripe_md *lsm)
1172 struct lov_lock_handles *lov_lockh;
1174 OBD_ALLOC(lov_lockh, sizeof(*lov_lockh) +
1175 sizeof(*lov_lockh->llh_handles) * lsm->lsm_stripe_count);
1179 get_random_bytes(&lov_lockh->llh_cookie, sizeof(lov_lockh->llh_cookie));
1184 /* We are only ever passed local lock handles here, so we do not need to
1185 * validate (and we can't really because these structs are variable sized
1186 * and therefore alloced, and not from a private slab).
1188 * We just check because we can...
1190 static struct lov_lock_handles *lov_h2lovlockh(struct lustre_handle *handle)
1192 struct lov_lock_handles *lov_lockh = NULL;
1194 if (!handle || !handle->addr)
1197 lov_lockh = (struct lov_lock_handles *)(unsigned long)(handle->addr);
1198 if (lov_lockh->llh_cookie != handle->cookie)
1204 static int lov_enqueue(struct lustre_handle *conn, struct lov_stripe_md *lsm,
1205 struct lustre_handle *parent_lock,
1206 __u32 type, void *cookie, int cookielen, __u32 mode,
1207 int *flags, void *cb, void *data, int datalen,
1208 struct lustre_handle *lockh)
1210 struct obd_export *export = class_conn2export(conn);
1211 struct lov_lock_handles *lov_lockh = NULL;
1212 struct lustre_handle *lov_lockhp;
1213 struct lov_obd *lov;
1214 struct lov_oinfo *loi;
1215 struct lov_stripe_md submd;
1220 CERROR("LOV requires striping ea\n");
1224 if (lsm->lsm_magic != LOV_MAGIC) {
1225 CERROR("LOV striping magic bad %#x != %#x\n",
1226 lsm->lsm_magic, LOV_MAGIC);
1230 /* we should never be asked to replay a lock. */
1232 LASSERT((*flags & LDLM_FL_REPLAY) == 0);
1234 if (!export || !export->exp_obd)
1237 if (lsm->lsm_stripe_count > 1) {
1238 lov_lockh = lov_newlockh(lsm);
1242 lockh->addr = (__u64)(unsigned long)lov_lockh;
1243 lockh->cookie = lov_lockh->llh_cookie;
1244 lov_lockhp = lov_lockh->llh_handles;
1248 lov = &export->exp_obd->u.lov;
1249 for (i = 0, loi = lsm->lsm_oinfo; i < lsm->lsm_stripe_count;
1250 i++, loi++, lov_lockhp++) {
1251 struct ldlm_extent *extent = (struct ldlm_extent *)cookie;
1252 struct ldlm_extent sub_ext;
1254 if (lov->tgts[loi->loi_ost_idx].active == 0) {
1255 CDEBUG(D_HA, "lov idx %d inactive\n", loi->loi_ost_idx);
1260 sub_ext.start = lov_stripe_offset(lsm, extent->start, i);
1261 sub_ext.end = lov_stripe_offset(lsm, extent->end, i);
1262 if (sub_ext.start == sub_ext.end /* || !active */)
1265 /* XXX LOV STACKING: submd should be from the subobj */
1266 submd.lsm_object_id = loi->loi_id;
1267 submd.lsm_stripe_count = 0;
1268 /* XXX submd is not fully initialized here */
1270 rc = obd_enqueue(&(lov->tgts[loi->loi_ost_idx].conn), &submd,
1271 parent_lock, type, &sub_ext, sizeof(sub_ext),
1272 mode, flags, cb, data, datalen, lov_lockhp);
1273 // XXX add a lock debug statement here
1275 memset(lov_lockhp, 0, sizeof(*lov_lockhp));
1276 if (rc && lov->tgts[loi->loi_ost_idx].active) {
1277 CERROR("Error enqueue objid "LPX64" subobj "LPX64
1278 " on OST idx %d: rc = %d\n", lsm->lsm_object_id,
1279 loi->loi_id, loi->loi_ost_idx, rc);
1286 while (loi--, lov_lockhp--, i-- > 0) {
1287 struct lov_stripe_md submd;
1290 if (lov_lockhp->addr == 0 ||
1291 lov->tgts[loi->loi_ost_idx].active == 0)
1294 /* XXX LOV STACKING: submd should be from the subobj */
1295 submd.lsm_object_id = loi->loi_id;
1296 submd.lsm_stripe_count = 0;
1297 err = obd_cancel(&lov->tgts[loi->loi_ost_idx].conn, &submd,
1300 CERROR("Error cancelling objid "LPX64
1301 " on OST idx %d after enqueue error: rc = %d\n",
1302 loi->loi_id, loi->loi_ost_idx, err);
1306 if (lsm->lsm_stripe_count > 1) {
1307 lov_lockh->llh_cookie = DEAD_HANDLE_MAGIC;
1308 OBD_FREE(lov_lockh, sizeof(*lov_lockh) +
1309 sizeof(*lov_lockh->llh_handles) *
1310 lsm->lsm_stripe_count);
1313 lockh->cookie = DEAD_HANDLE_MAGIC;
1318 static int lov_cancel(struct lustre_handle *conn, struct lov_stripe_md *lsm,
1319 __u32 mode, struct lustre_handle *lockh)
1321 struct obd_export *export = class_conn2export(conn);
1322 struct lov_lock_handles *lov_lockh = NULL;
1323 struct lustre_handle *lov_lockhp;
1324 struct lov_obd *lov;
1325 struct lov_oinfo *loi;
1330 CERROR("LOV requires striping ea\n");
1334 if (lsm->lsm_magic != LOV_MAGIC) {
1335 CERROR("LOV striping magic bad %#x != %#x\n",
1336 lsm->lsm_magic, LOV_MAGIC);
1340 if (!export || !export->exp_obd)
1344 if (lsm->lsm_stripe_count > 1) {
1345 lov_lockh = lov_h2lovlockh(lockh);
1347 CERROR("LOV: invalid lov lock handle %p\n", lockh);
1351 lov_lockhp = lov_lockh->llh_handles;
1355 lov = &export->exp_obd->u.lov;
1356 for (i = 0, loi = lsm->lsm_oinfo; i < lsm->lsm_stripe_count;
1357 i++, loi++, lov_lockhp++ ) {
1358 struct lov_stripe_md submd;
1361 if (lov_lockhp->addr == 0) {
1362 CDEBUG(D_HA, "lov idx %d no lock?\n", loi->loi_ost_idx);
1366 /* XXX LOV STACKING: submd should be from the subobj */
1367 submd.lsm_object_id = loi->loi_id;
1368 submd.lsm_stripe_count = 0;
1369 err = obd_cancel(&lov->tgts[loi->loi_ost_idx].conn, &submd,
1372 if (lov->tgts[loi->loi_ost_idx].active) {
1373 CERROR("Error cancel objid "LPX64" subobj "
1374 LPX64" on OST idx %d: rc = %d\n",
1376 loi->loi_id, loi->loi_ost_idx, err);
1383 if (lsm->lsm_stripe_count > 1) {
1384 lov_lockh->llh_cookie = DEAD_HANDLE_MAGIC;
1385 OBD_FREE(lov_lockh, sizeof(*lov_lockh) +
1386 sizeof(*lov_lockh->llh_handles) *
1387 lsm->lsm_stripe_count);
1390 lockh->cookie = DEAD_HANDLE_MAGIC;
1395 static int lov_cancel_unused(struct lustre_handle *conn,
1396 struct lov_stripe_md *lsm, int flags)
1398 struct obd_export *export = class_conn2export(conn);
1399 struct lov_obd *lov;
1400 struct lov_oinfo *loi;
1405 CERROR("LOV requires striping ea for lock cancellation\n");
1409 if (!export || !export->exp_obd)
1412 lov = &export->exp_obd->u.lov;
1413 for (i = 0,loi = lsm->lsm_oinfo; i < lsm->lsm_stripe_count; i++,loi++) {
1414 struct lov_stripe_md submd;
1417 submd.lsm_object_id = loi->loi_id;
1418 submd.lsm_stripe_count = 0;
1419 err = obd_cancel_unused(&lov->tgts[loi->loi_ost_idx].conn,
1421 if (err && lov->tgts[loi->loi_ost_idx].active) {
1422 CERROR("Error cancel unused objid "LPX64" subobj "LPX64
1423 " on OST idx %d: rc = %d\n", lsm->lsm_object_id,
1424 loi->loi_id, loi->loi_ost_idx, err);
1433 static int lov_statfs(struct lustre_handle *conn, struct obd_statfs *osfs)
1435 struct obd_export *export = class_conn2export(conn);
1436 struct lov_obd *lov;
1437 struct obd_statfs lov_sfs;
1443 if (!export || !export->exp_obd)
1446 lov = &export->exp_obd->u.lov;
1448 /* We only get block data from the OBD */
1449 for (i = 0; i < lov->desc.ld_tgt_count; i++) {
1452 if (!lov->tgts[i].active) {
1453 CDEBUG(D_HA, "lov idx %d inactive\n", i);
1457 err = obd_statfs(&lov->tgts[i].conn, &lov_sfs);
1459 CERROR("Error statfs OSC %s i %d: err = %d\n",
1460 lov->tgts[i].uuid, i, err);
1463 continue; /* XXX or break? - probably OK to continue */
1466 memcpy(osfs, &lov_sfs, sizeof(lov_sfs));
1469 osfs->os_bfree += lov_sfs.os_bfree;
1470 osfs->os_bavail += lov_sfs.os_bavail;
1471 osfs->os_blocks += lov_sfs.os_blocks;
1472 /* XXX not sure about this one - depends on policy.
1473 * - could be minimum if we always stripe on all OBDs
1474 * (but that would be wrong for any other policy,
1475 * if one of the OBDs has no more objects left)
1476 * - could be sum if we stripe whole objects
1477 * - could be average, just to give a nice number
1478 * - we just pick first OST and hope it is enough
1479 sfs->f_ffree += lov_sfs.f_ffree;
1486 static int lov_iocontrol(unsigned int cmd, struct lustre_handle *conn, int len,
1487 void *karg, void *uarg)
1489 struct obd_device *obddev = class_conn2obd(conn);
1490 struct lov_obd *lov = &obddev->u.lov;
1491 int i, count = lov->desc.ld_tgt_count;
1497 case IOC_LOV_SET_OSC_ACTIVE: {
1498 struct obd_ioctl_data *data = karg;
1499 rc = lov_set_osc_active(lov,data->ioc_inlbuf1,data->ioc_offset);
1502 case OBD_IOC_LOV_GET_CONFIG: {
1503 struct obd_ioctl_data *data = karg;
1504 struct lov_tgt_desc *tgtdesc;
1505 struct lov_desc *desc;
1511 if (obd_ioctl_getdata(&buf, &len, (void *)uarg))
1514 data = (struct obd_ioctl_data *)buf;
1516 if (sizeof(*desc) > data->ioc_inllen1) {
1521 if (sizeof(*uuidp) * count > data->ioc_inllen2) {
1526 desc = (struct lov_desc *)data->ioc_inlbuf1;
1527 uuidp = (obd_uuid_t *)data->ioc_inlbuf2;
1528 memcpy(desc, &(lov->desc), sizeof(*desc));
1530 tgtdesc = lov->tgts;
1531 for (i = 0; i < count; i++, uuidp++, tgtdesc++)
1532 memcpy(uuidp, tgtdesc->uuid, sizeof(*uuidp));
1534 rc = copy_to_user((void *)uarg, buf, len);
1540 case LL_IOC_LOV_SETSTRIPE:
1541 rc = lov_setstripe(conn, karg, uarg);
1543 case LL_IOC_LOV_GETSTRIPE:
1544 rc = lov_getstripe(conn, karg, uarg);
1550 for (i = 0; i < count; i++) {
1553 err = obd_iocontrol(cmd, &lov->tgts[i].conn,
1563 struct obd_ops lov_obd_ops = {
1564 o_owner: THIS_MODULE,
1565 o_attach: lov_attach,
1566 o_detach: lov_detach,
1568 o_connect: lov_connect,
1569 o_disconnect: lov_disconnect,
1570 o_statfs: lov_statfs,
1571 o_packmd: lov_packmd,
1572 o_unpackmd: lov_unpackmd,
1573 o_create: lov_create,
1574 o_destroy: lov_destroy,
1575 o_getattr: lov_getattr,
1576 o_setattr: lov_setattr,
1581 o_enqueue: lov_enqueue,
1582 o_cancel: lov_cancel,
1583 o_cancel_unused: lov_cancel_unused,
1584 o_iocontrol: lov_iocontrol
1588 #define LOV_VERSION "v0.1"
1590 static int __init lov_init(void)
1593 printk(KERN_INFO "Lustre Logical Object Volume driver " LOV_VERSION
1594 ", info@clusterfs.com\n");
1595 lov_file_cache = kmem_cache_create("ll_lov_file_data",
1596 sizeof(struct lov_file_handles),
1598 if (!lov_file_cache)
1601 rc = class_register_type(&lov_obd_ops, status_class_var,
1602 OBD_LOV_DEVICENAME);
1606 static void __exit lov_exit(void)
1608 if (kmem_cache_destroy(lov_file_cache))
1609 CERROR("couldn't free LOV open cache\n");
1610 class_unregister_type(OBD_LOV_DEVICENAME);
1613 MODULE_AUTHOR("Cluster File Systems, Inc. <info@clusterfs.com>");
1614 MODULE_DESCRIPTION("Lustre Logical Object Volume OBD driver " LOV_VERSION);
1615 MODULE_LICENSE("GPL");
1617 module_init(lov_init);
1618 module_exit(lov_exit);