1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2 * vim:expandtab:shiftwidth=8:tabstop=8:
6 * Copyright (C) 2002 Cluster File Systems, Inc.
7 * Author: Phil Schwan <phil@off.net>
9 * This code is issued under the GNU General Public License.
10 * See the file COPYING in this distribution
14 #define DEBUG_SUBSYSTEM S_LOV
16 #include <linux/slab.h>
17 #include <linux/module.h>
18 #include <linux/obd_support.h>
19 #include <linux/lustre_lib.h>
20 #include <linux/lustre_net.h>
21 #include <linux/lustre_idl.h>
22 #include <linux/obd_class.h>
23 #include <linux/obd_lov.h>
25 extern struct obd_device obd_dev[MAX_OBD_DEVICES];
29 static int lov_getinfo(struct obd_device *obd,
30 struct lov_desc *desc,
32 struct ptlrpc_request **request)
34 struct ptlrpc_request *req;
35 struct mds_status_req *streq;
36 struct lov_obd *lov = &obd->u.lov;
37 struct mdc_obd *mdc = &lov->mdcobd->u.mdc;
38 int rc, size[2] = {sizeof(*streq)};
41 req = ptlrpc_prep_req2(mdc->mdc_client, mdc->mdc_conn, &mdc->mdc_connh,
42 MDS_LOVINFO, 1, size, NULL);
44 GOTO(out, rc = -ENOMEM);
47 streq = lustre_msg_buf(req->rq_reqmsg, 0);
48 streq->flags = HTON__u32(MDS_STATUS_LOV);
49 streq->repbuf = HTON__u32(8000);
51 /* prepare for reply */
52 req->rq_level = LUSTRE_CONN_CON;
53 size[0] = sizeof(*desc);
55 req->rq_replen = lustre_msg_size(2, size);
57 rc = ptlrpc_queue_wait(req);
58 rc = ptlrpc_check_status(req, rc);
61 memcpy(desc, lustre_msg_buf(req->rq_repmsg, 0), sizeof(*desc));
62 *uuids = lustre_msg_buf(req->rq_repmsg, 1);
70 static int lov_connect(struct lustre_handle *conn, struct obd_device *obd)
74 struct ptlrpc_request *req;
75 struct lov_obd *lov = &obd->u.lov;
79 rc = class_connect(conn, obd);
85 rc = lov_getinfo(obd, &lov->desc, &uuidarray, &req);
87 CERROR("cannot get lov info %d\n", rc);
91 if (lov->desc.ld_tgt_count > 1000) {
92 CERROR("configuration error: target count > 1000 (%d)\n",
93 lov->desc.ld_tgt_count);
94 GOTO(out, rc = -EINVAL);
97 if (strcmp(obd->obd_uuid, lov->desc.ld_uuid)) {
98 CERROR("lov uuid %s not on mds device (%s)\n",
99 obd->obd_uuid, lov->desc.ld_uuid);
100 GOTO(out, rc = -EINVAL);
103 if (req->rq_repmsg->bufcount < 2 || req->rq_repmsg->buflens[1] <
104 sizeof(uuid_t) * lov->desc.ld_tgt_count) {
105 CERROR("invalid uuid array returned\n");
106 GOTO(out, rc = -EINVAL);
109 lov->bufsize = sizeof(struct lov_tgt_desc) * lov->desc.ld_tgt_count;
110 OBD_ALLOC(lov->tgts, lov->bufsize);
112 CERROR("Out of memory\n");
113 GOTO(out, rc = -ENOMEM);
116 uuidarray = lustre_msg_buf(req->rq_repmsg, 1);
117 for (i = 0 ; i < lov->desc.ld_tgt_count; i++) {
118 memcpy(lov->tgts[i].uuid, uuidarray[i], sizeof(uuid_t));
121 for (i = 0 ; i < lov->desc.ld_tgt_count; i++) {
122 struct obd_device *tgt = class_uuid2obd(uuidarray[i]);
124 CERROR("Target %s not configured\n", uuidarray[i]);
125 GOTO(out_mem, rc = -EINVAL);
127 rc = obd_connect(&lov->tgts[i].conn, tgt);
129 CERROR("Target %s connect error %d\n",
137 for (i = 0 ; i < lov->desc.ld_tgt_count; i++) {
138 rc = obd_disconnect(&lov->tgts[i].conn);
140 CERROR("Target %s disconnect error %d\n",
143 OBD_FREE(lov->tgts, lov->bufsize);
147 class_disconnect(conn);
150 ptlrpc_free_req(req);
155 static int lov_disconnect(struct lustre_handle *conn)
157 struct obd_device *obd = class_conn2obd(conn);
158 struct lov_obd *lov = &obd->u.lov;
165 for (i = 0 ; i < lov->desc.ld_tgt_count; i++) {
166 rc = obd_disconnect(&lov->tgts[i].conn);
168 CERROR("Target %s disconnect error %d\n",
169 lov->tgts[i].uuid, rc);
171 OBD_FREE(lov->tgts, lov->bufsize);
177 rc = class_disconnect(conn);
184 static int lov_setup(struct obd_device *obd, obd_count len, void *buf)
186 struct obd_ioctl_data* data = buf;
187 struct lov_obd *lov = &obd->u.lov;
191 if (data->ioc_inllen1 < 1) {
192 CERROR("osc setup requires an MDC UUID\n");
196 if (data->ioc_inllen1 > 37) {
197 CERROR("mdc UUID must be less than 38 characters\n");
201 /* FIXME: we should make a connection instead perhaps to avoid
202 the mdc from walking away? The fs guarantees this. */
203 lov->mdcobd = class_uuid2obd(data->ioc_inlbuf1);
205 CERROR("LOV %s cannot locate MDC %s\n", obd->obd_uuid,
213 static int lov_getattr(struct lustre_handle *conn, struct obdo *oa)
218 if (!class_conn2export(conn))
221 rc = obd_getattr(&conn->oc_dev->obd_multi_conn[0], oa);
225 static int lov_setattr(struct lustre_handle *conn, struct obdo *oa)
230 if (!class_conn2export(conn))
233 for (i = 0; i < conn->oc_dev->obd_multi_count; i++) {
234 rc = obd_setattr(&conn->oc_dev->obd_multi_conn[i], oa);
237 else if (retval != rc)
238 CERROR("different results on multiple OBDs!\n");
244 static int lov_open(struct lustre_handle *conn, struct obdo *oa)
249 if (!class_conn2export(conn))
252 for (i = 0; i < conn->oc_dev->obd_multi_count; i++) {
253 rc = obd_open(&conn->oc_dev->obd_multi_conn[i], oa);
256 else if (retval != rc)
257 CERROR("different results on multiple OBDs!\n");
263 static int lov_close(struct lustre_handle *conn, struct obdo *oa)
268 if (!class_conn2export(conn))
271 for (i = 0; i < conn->oc_dev->obd_multi_count; i++) {
272 rc = obd_close(&conn->oc_dev->obd_multi_conn[i], oa);
275 else if (retval != rc)
276 CERROR("different results on multiple OBDs!\n");
282 static int lov_create(struct lustre_handle *conn, struct obdo *oa)
284 int rc, retval, i, offset;
289 if (!class_conn2export(conn))
292 md.lmd_object_id = oa->o_id;
293 md.lmd_stripe_count = conn->oc_dev->obd_multi_count;
295 memset(oa->o_inline, 0, sizeof(oa->o_inline));
297 for (i = 0; i < md.lmd_stripe_count; i++) {
298 struct lov_object_id lov_id;
299 rc = obd_create(&conn->oc_dev->obd_multi_conn[i], &tmp);
301 memcpy(oa, &tmp, sizeof(tmp));
303 } else if (retval != rc)
304 CERROR("return codes didn't match (%d, %d)\n",
306 lov_id = (struct lov_object_id *)(oa->o_inline + offset);
307 lov_id->l_device_id = i;
308 lov_id->l_object_id = tmp.o_id;
309 offset += sizeof(*lov_id);
311 memcpy(oa->o_inline, &md, sizeof(md));
316 static int lov_destroy(struct lustre_handle *conn, struct obdo *oa)
318 int rc, retval, i, offset;
321 struct lov_object_id lov_id;
324 if (!class_conn2export(conn))
327 md = (struct lov_md *)oa->o_inline;
329 memcpy(&tmp, oa, sizeof(tmp));
332 for (i = 0; i < md->lmd_stripe_count; i++) {
333 struct lov_object_id *lov_id;
334 lov_id = (struct lov_object_id *)(oa->o_inline + offset);
336 tmp.o_id = lov_id->l_object_id;
338 rc = obd_destroy(&conn->oc_dev->obd_multi_conn[i], &tmp);
341 else if (retval != rc)
342 CERROR("return codes didn't match (%d, %d)\n",
344 offset += sizeof(*lov_id);
350 /* FIXME: maybe we'll just make one node the authoritative attribute node, then
351 * we can send this 'punch' to just the authoritative node and the nodes
352 * that the punch will affect. */
353 static int lov_punch(struct lustre_handle *conn, struct obdo *oa,
354 obd_size count, obd_off offset)
359 if (!class_conn2export(conn))
362 for (i = 0; i < conn->oc_dev->obd_multi_count; i++) {
363 rc = obd_punch(&conn->oc_dev->obd_multi_conn[i], oa, count,
367 else if (retval != rc)
368 CERROR("different results on multiple OBDs!\n");
374 struct lov_callback_data {
376 wait_queue_head waitq;
379 static void lov_read_callback(struct ptlrpc_bulk_desc *desc, void *data)
381 struct lov_callback_data *cb_data = data;
383 if (atomic_dec_and_test(&cb_data->count))
384 wake_up(&cb_data->waitq);
387 static int lov_read_check_status(struct lov_callback_data *cb_data)
390 if (sigismember(&(current->pending.signal), SIGKILL) ||
391 sigismember(&(current->pending.signal), SIGTERM) ||
392 sigismember(&(current->pending.signal), SIGINT)) {
393 cb_data->flags |= PTL_RPC_FL_INTR;
396 if (atomic_read(&cb_data->count) == 0)
401 /* buffer must lie in user memory here */
402 static int lov_brw(int cmd, struct lustre_handle *conn, obd_count num_oa,
404 obd_count *oa_bufs, struct page **buf,
405 obd_size *count, obd_off *offset, obd_flag *flags,
406 bulk_callback_t callback, void *data)
408 int rc, i, page_array_offset = 0;
409 obd_off off = offset;
411 struct lov_callback_data *cb_data;
417 if (!class_conn2export(conn))
420 OBD_ALLOC(cb_data, sizeof(*cb_data));
421 if (cb_data == NULL) {
425 INIT_WAITQUEUE_HEAD(&cb_data->waitq);
426 atomic_set(&cb_data->count, 0);
428 for (i = 0; i < oa_bufs[0]; i++) {
429 struct page *current_page = buf[i];
431 struct lov_md *md = (struct lov_md *)oa[i]->inline;
432 int bufcount = oa_bufs[i];
433 // md->lmd_stripe_count
435 for (k = page_array_offset; k < bufcount + page_array_offset;
439 page_array_offset += bufcount;
442 while (off < offset + count) {
446 stripe = off / conn->oc_dev->u.lov.lov_stripe_size;
447 size = (stripe + 1) * conn->oc_dev->u.lov.lov_strip_size - off;
451 conn = stripe % conn->oc_dev->obd_multi_count;
454 atomic_inc(&cb_data->count);
455 rc = obd_brw(cmd, &conn->oc_dev->obd_multi_conn[conn],
457 &size, off, lov_read_callback, cb_data);
461 CERROR("read(off=%Lu, count=%Lu): %d\n",
462 (unsigned long long)off,
463 (unsigned long long)size, rc);
470 wait_event(&cb_data->waitq, lov_read_check_status(cb_data));
471 if (cb_data->flags & PTL_RPC_FL_INTR)
474 /* FIXME: The error handling here sucks */
476 OBD_FREE(cb_data, sizeof(*cb_data));
480 static void lov_write_finished(struct ptlrpc_bulk_desc *desc, void *data)
485 /* buffer must lie in user memory here */
486 static int filter_write(struct lustre_handle *conn, struct obdo *oa, char *buf,
487 obd_size *count, obd_off offset)
491 unsigned long retval;
494 if (!class_conn2export(conn)) {
495 CDEBUG(D_IOCTL, "invalid client %u\n", conn->oc_id);
500 file = filter_obj_open(conn->oc_dev, oa->o_id, oa->o_mode);
501 if (!file || IS_ERR(file)) {
503 return -PTR_ERR(file);
506 /* count doubles as retval */
507 retval = file->f_op->write(file, buf, *count, (loff_t *)&offset);
523 static int lov_enqueue(struct lustre_handle *conn, struct ldlm_namespace *ns,
524 struct ldlm_handle *parent_lock, __u64 *res_id,
525 __u32 type, struct ldlm_extent *extent, __u32 mode,
526 int *flags, void *data, int datalen,
527 struct ldlm_handle *lockh)
532 if (!class_conn2export(conn))
535 rc = obd_enqueue(&conn->oc_dev->obd_multi_conn[0], ns, parent_lock,
536 res_id, type, extent, mode, flags, data, datalen,
541 static int lov_cancel(struct lustre_handle *conn, __u32 mode,
542 struct ldlm_handle *lockh)
547 if (!class_conn2export(conn))
550 rc = obd_cancel(&conn->oc_dev->obd_multi_conn[0], oa);
555 struct obd_ops lov_obd_ops = {
557 o_connect: lov_connect,
558 o_disconnect: lov_disconnect,
560 o_create: lov_create,
561 o_getattr: lov_getattr,
562 o_setattr: lov_setattr,
565 o_destroy: lov_destroy,
566 o_brw: lov_pgcache_brw,
568 o_enqueue: lov_enqueue,
574 #define LOV_VERSION "v0.1"
576 static int __init lov_init(void)
578 printk(KERN_INFO "Lustre Logical Object Volume driver " LOV_VERSION
579 ", info@clusterfs.com\n");
580 return class_register_type(&lov_obd_ops, OBD_LOV_DEVICENAME);
583 static void __exit lov_exit(void)
585 class_unregister_type(OBD_LOV_DEVICENAME);
588 MODULE_AUTHOR("Cluster File Systems, Inc. <info@clusterfs.com>");
589 MODULE_DESCRIPTION("Lustre Logical Object Volume OBD driver v0.1");
590 MODULE_LICENSE("GPL");
592 module_init(lov_init);
593 module_exit(lov_exit);