1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2 * vim:expandtab:shiftwidth=8:tabstop=8:
6 * Copyright (C) 2002 Cluster File Systems, Inc.
7 * Author: Phil Schwan <phil@off.net>
9 * This code is issued under the GNU General Public License.
10 * See the file COPYING in this distribution
14 #define DEBUG_SUBSYSTEM S_LOV
16 #include <linux/slab.h>
17 #include <linux/module.h>
18 #include <linux/obd_support.h>
19 #include <linux/lustre_lib.h>
20 #include <linux/lustre_net.h>
21 #include <linux/lustre_idl.h>
22 #include <linux/lustre_mds.h>
23 #include <linux/obd_class.h>
24 #include <linux/obd_lov.h>
25 #include <linux/init.h>
27 extern struct obd_device obd_dev[MAX_OBD_DEVICES];
31 static int lov_connect(struct lustre_handle *conn, struct obd_device *obd)
33 struct ptlrpc_request *req;
34 struct lov_obd *lov = &obd->u.lov;
35 struct lustre_handle mdc_conn;
41 rc = class_connect(conn, obd);
47 rc = obd_connect(&mdc_conn, lov->mdcobd);
49 CERROR("cannot connect to mdc: rc = %d\n", rc);
50 GOTO(out, rc = -EINVAL);
53 rc = mdc_getlovinfo(obd, &mdc_conn, &uuidarray, &req);
54 obd_disconnect(&mdc_conn);
57 CERROR("cannot get lov info %d\n", rc);
62 if (lov->desc.ld_tgt_count > 1000) {
63 CERROR("configuration error: target count > 1000 (%d)\n",
64 lov->desc.ld_tgt_count);
65 GOTO(out, rc = -EINVAL);
68 if (strcmp(obd->obd_uuid, lov->desc.ld_uuid)) {
69 CERROR("lov uuid %s not on mds device (%s)\n",
70 obd->obd_uuid, lov->desc.ld_uuid);
71 GOTO(out, rc = -EINVAL);
74 if (req->rq_repmsg->bufcount < 2 || req->rq_repmsg->buflens[1] <
75 sizeof(uuid_t) * lov->desc.ld_tgt_count) {
76 CERROR("invalid uuid array returned\n");
77 GOTO(out, rc = -EINVAL);
80 lov->bufsize = sizeof(struct lov_tgt_desc) * lov->desc.ld_tgt_count;
81 OBD_ALLOC(lov->tgts, lov->bufsize);
83 CERROR("Out of memory\n");
84 GOTO(out, rc = -ENOMEM);
87 uuidarray = lustre_msg_buf(req->rq_repmsg, 1);
88 for (i = 0 ; i < lov->desc.ld_tgt_count; i++)
89 memcpy(lov->tgts[i].uuid, uuidarray[i], sizeof(uuid_t));
91 for (i = 0 ; i < lov->desc.ld_tgt_count; i++) {
92 struct obd_device *tgt = class_uuid2obd(uuidarray[i]);
94 CERROR("Target %s not configured\n", uuidarray[i]);
95 GOTO(out_mem, rc = -EINVAL);
97 rc = obd_connect(&lov->tgts[i].conn, tgt);
99 CERROR("Target %s connect error %d\n",
107 for (i = 0 ; i < lov->desc.ld_tgt_count; i++) {
109 rc2 = obd_disconnect(&lov->tgts[i].conn);
111 CERROR("BAD: Target %s disconnect error %d\n",
114 OBD_FREE(lov->tgts, lov->bufsize);
118 class_disconnect(conn);
120 ptlrpc_free_req(req);
124 static int lov_disconnect(struct lustre_handle *conn)
126 struct obd_device *obd = class_conn2obd(conn);
127 struct lov_obd *lov = &obd->u.lov;
134 for (i = 0 ; i < lov->desc.ld_tgt_count; i++) {
135 rc = obd_disconnect(&lov->tgts[i].conn);
137 CERROR("Target %s disconnect error %d\n",
138 lov->tgts[i].uuid, rc);
142 OBD_FREE(lov->tgts, lov->bufsize);
148 rc = class_disconnect(conn);
155 static int lov_setup(struct obd_device *obd, obd_count len, void *buf)
157 struct obd_ioctl_data* data = buf;
158 struct lov_obd *lov = &obd->u.lov;
162 if (data->ioc_inllen1 < 1) {
163 CERROR("osc setup requires an MDC UUID\n");
167 if (data->ioc_inllen1 > 37) {
168 CERROR("mdc UUID must be less than 38 characters\n");
172 /* FIXME: we should make a connection instead perhaps to avoid
173 the mdc from walking away? The fs guarantees this. */
174 lov->mdcobd = class_uuid2obd(data->ioc_inlbuf1);
176 CERROR("LOV %s cannot locate MDC %s\n", obd->obd_uuid,
184 static inline int lov_stripe_md_size(struct obd_device *obd)
186 struct lov_obd *lov = &obd->u.lov;
189 size = sizeof(struct lov_stripe_md) +
190 lov->desc.ld_tgt_count * sizeof(struct lov_object_id);
194 static int lov_create(struct lustre_handle *conn, struct obdo *oa, struct lov_stripe_md **ea)
198 struct obd_export *export = class_conn2export(conn);
200 struct lov_stripe_md *md;
204 CERROR("lov_create needs EA for striping information\n");
210 lov = &export->exp_obd->u.lov;
212 oa->o_easize = lov_stripe_md_size(export->exp_obd);
214 OBD_ALLOC(*ea, oa->o_easize);
220 md->lmd_size = oa->o_easize;
221 md->lmd_object_id = oa->o_id;
222 if (!md->lmd_stripe_count) {
223 md->lmd_stripe_count = lov->desc.ld_default_stripecount;
226 for (i = 0; i < md->lmd_stripe_count; i++) {
227 struct lov_stripe_md obj_md;
228 struct lov_stripe_md *obj_mdp = &obj_md;
229 /* create data objects with "parent" OA */
230 memcpy(&tmp, oa, sizeof(tmp));
231 tmp.o_easize = sizeof(struct lov_stripe_md);
232 rc = obd_create(&lov->tgts[i].conn, &tmp, &obj_mdp);
234 GOTO(out_cleanup, rc);
235 md->lmd_objects[i].l_object_id = tmp.o_id;
241 for (i2 = 0 ; i2 < i ; i2++) {
242 /* destroy already created objects here */
243 tmp.o_id = md->lmd_objects[i].l_object_id;
244 rc2 = obd_destroy(&lov->tgts[i].conn, &tmp, NULL);
246 CERROR("Failed to remove object from target %d\n",
254 static int lov_destroy(struct lustre_handle *conn, struct obdo *oa,
255 struct lov_stripe_md *ea)
259 struct obd_export *export = class_conn2export(conn);
261 struct lov_stripe_md *md;
265 CERROR("LOV requires striping ea for desctruction\n");
269 if (!export || !export->exp_obd)
272 lov = &export->exp_obd->u.lov;
275 for (i = 0; i < md->lmd_stripe_count; i++) {
276 /* create data objects with "parent" OA */
277 memcpy(&tmp, oa, sizeof(tmp));
278 oa->o_id = md->lmd_objects[i].l_object_id;
279 rc = obd_destroy(&lov->tgts[i].conn, &tmp, NULL);
281 CERROR("Error destroying object %Ld on %d\n",
289 static int lov_getattr(struct lustre_handle *conn, struct obdo *oa)
294 if (!class_conn2export(conn))
297 rc = obd_getattr(&conn->oc_dev->obd_multi_conn[0], oa);
301 static int lov_setattr(struct lustre_handle *conn, struct obdo *oa)
306 if (!class_conn2export(conn))
309 for (i = 0; i < conn->oc_dev->obd_multi_count; i++) {
310 rc = obd_setattr(&conn->oc_dev->obd_multi_conn[i], oa);
313 else if (retval != rc)
314 CERROR("different results on multiple OBDs!\n");
320 static int lov_open(struct lustre_handle *conn, struct obdo *oa)
325 if (!class_conn2export(conn))
328 for (i = 0; i < conn->oc_dev->obd_multi_count; i++) {
329 rc = obd_open(&conn->oc_dev->obd_multi_conn[i], oa);
332 else if (retval != rc)
333 CERROR("different results on multiple OBDs!\n");
339 static int lov_close(struct lustre_handle *conn, struct obdo *oa)
344 if (!class_conn2export(conn))
347 for (i = 0; i < conn->oc_dev->obd_multi_count; i++) {
348 rc = obd_close(&conn->oc_dev->obd_multi_conn[i], oa);
351 else if (retval != rc)
352 CERROR("different results on multiple OBDs!\n");
360 /* FIXME: maybe we'll just make one node the authoritative attribute node, then
361 * we can send this 'punch' to just the authoritative node and the nodes
362 * that the punch will affect. */
363 static int lov_punch(struct lustre_handle *conn, struct obdo *oa,
364 obd_size count, obd_off offset)
369 if (!class_conn2export(conn))
372 for (i = 0; i < conn->oc_dev->obd_multi_count; i++) {
373 rc = obd_punch(&conn->oc_dev->obd_multi_conn[i], oa, count,
377 else if (retval != rc)
378 CERROR("different results on multiple OBDs!\n");
384 struct lov_callback_data {
386 wait_queue_head waitq;
389 static void lov_read_callback(struct ptlrpc_bulk_desc *desc, void *data)
391 struct lov_callback_data *cb_data = data;
393 if (atomic_dec_and_test(&cb_data->count))
394 wake_up(&cb_data->waitq);
397 static int lov_read_check_status(struct lov_callback_data *cb_data)
400 if (sigismember(&(current->pending.signal), SIGKILL) ||
401 sigismember(&(current->pending.signal), SIGTERM) ||
402 sigismember(&(current->pending.signal), SIGINT)) {
403 cb_data->flags |= PTL_RPC_FL_INTR;
406 if (atomic_read(&cb_data->count) == 0)
411 /* buffer must lie in user memory here */
412 static int lov_brw(int cmd, struct lustre_handle *conn, obd_count num_oa,
414 obd_count *oa_bufs, struct page **buf,
415 obd_size *count, obd_off *offset, obd_flag *flags,
416 bulk_callback_t callback, void *data)
418 int rc, i, page_array_offset = 0;
419 obd_off off = offset;
421 struct lov_callback_data *cb_data;
427 if (!class_conn2export(conn))
430 OBD_ALLOC(cb_data, sizeof(*cb_data));
431 if (cb_data == NULL) {
435 INIT_WAITQUEUE_HEAD(&cb_data->waitq);
436 atomic_set(&cb_data->count, 0);
438 for (i = 0; i < oa_bufs[0]; i++) {
439 struct page *current_page = buf[i];
441 struct lov_md *md = (struct lov_md *)oa[i]->inline;
442 int bufcount = oa_bufs[i];
443 // md->lmd_stripe_count
445 for (k = page_array_offset; k < bufcount + page_array_offset;
449 page_array_offset += bufcount;
452 while (off < offset + count) {
456 stripe = off / conn->oc_dev->u.lov.lov_stripe_size;
457 size = (stripe + 1) * conn->oc_dev->u.lov.lov_strip_size - off;
461 conn = stripe % conn->oc_dev->obd_multi_count;
464 atomic_inc(&cb_data->count);
465 rc = obd_brw(cmd, &conn->oc_dev->obd_multi_conn[conn],
467 &size, off, lov_read_callback, cb_data);
471 CERROR("read(off=%Lu, count=%Lu): %d\n",
472 (unsigned long long)off,
473 (unsigned long long)size, rc);
480 wait_event(&cb_data->waitq, lov_read_check_status(cb_data));
481 if (cb_data->flags & PTL_RPC_FL_INTR)
484 /* FIXME: The error handling here sucks */
486 OBD_FREE(cb_data, sizeof(*cb_data));
490 static void lov_write_finished(struct ptlrpc_bulk_desc *desc, void *data)
495 /* buffer must lie in user memory here */
496 static int filter_write(struct lustre_handle *conn, struct obdo *oa, char *buf,
497 obd_size *count, obd_off offset)
501 unsigned long retval;
504 if (!class_conn2export(conn)) {
505 CDEBUG(D_IOCTL, "invalid client %u\n", conn->oc_id);
510 file = filter_obj_open(conn->oc_dev, oa->o_id, oa->o_mode);
511 if (!file || IS_ERR(file)) {
513 return -PTR_ERR(file);
516 /* count doubles as retval */
517 retval = file->f_op->write(file, buf, *count, (loff_t *)&offset);
533 static int lov_enqueue(struct lustre_handle *conn, struct ldlm_namespace *ns,
534 struct ldlm_handle *parent_lock, __u64 *res_id,
535 __u32 type, struct ldlm_extent *extent, __u32 mode,
536 int *flags, void *data, int datalen,
537 struct ldlm_handle *lockh)
542 if (!class_conn2export(conn))
545 rc = obd_enqueue(&conn->oc_dev->obd_multi_conn[0], ns, parent_lock,
546 res_id, type, extent, mode, flags, data, datalen,
551 static int lov_cancel(struct lustre_handle *conn, __u32 mode,
552 struct ldlm_handle *lockh)
557 if (!class_conn2export(conn))
560 rc = obd_cancel(&conn->oc_dev->obd_multi_conn[0], oa);
565 struct obd_ops lov_obd_ops = {
567 o_connect: lov_connect,
568 o_disconnect: lov_disconnect,
569 o_create: lov_create,
570 o_destroy: lov_destroy,
572 o_getattr: lov_getattr,
573 o_setattr: lov_setattr,
576 o_brw: lov_pgcache_brw,
578 o_enqueue: lov_enqueue,
584 #define LOV_VERSION "v0.1"
586 static int __init lov_init(void)
588 printk(KERN_INFO "Lustre Logical Object Volume driver " LOV_VERSION
589 ", info@clusterfs.com\n");
590 return class_register_type(&lov_obd_ops, OBD_LOV_DEVICENAME);
593 static void __exit lov_exit(void)
595 class_unregister_type(OBD_LOV_DEVICENAME);
598 MODULE_AUTHOR("Cluster File Systems, Inc. <info@clusterfs.com>");
599 MODULE_DESCRIPTION("Lustre Logical Object Volume OBD driver v0.1");
600 MODULE_LICENSE("GPL");
602 module_init(lov_init);
603 module_exit(lov_exit);