1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2 * vim:expandtab:shiftwidth=8:tabstop=8:
6 * Copyright (C) 2002 Cluster File Systems, Inc.
7 * Author: Phil Schwan <phil@off.net>
9 * This code is issued under the GNU General Public License.
10 * See the file COPYING in this distribution
14 #define DEBUG_SUBSYSTEM S_LOV
16 #include <linux/module.h>
17 #include <linux/obd_class.h>
18 #include <linux/obd_lov.h>
20 extern struct obd_device obd_dev[MAX_OBD_DEVICES];
23 static int lov_connect(struct lustre_handle *conn)
28 rc = class_connect(conn);
36 static int lov_disconnect(struct lustre_handle *conn)
40 rc = class_disconnect(conn);
44 /* XXX cleanup preallocated inodes */
48 static int lov_getattr(struct lustre_handle *conn, struct obdo *oa)
53 if (!class_conn2export(conn))
56 rc = obd_getattr(&conn->oc_dev->obd_multi_conn[0], oa);
60 static int lov_setattr(struct lustre_handle *conn, struct obdo *oa)
65 if (!class_conn2export(conn))
68 for (i = 0; i < conn->oc_dev->obd_multi_count; i++) {
69 rc = obd_setattr(&conn->oc_dev->obd_multi_conn[i], oa);
72 else if (retval != rc)
73 CERROR("different results on multiple OBDs!\n");
79 static int lov_open(struct lustre_handle *conn, struct obdo *oa)
84 if (!class_conn2export(conn))
87 for (i = 0; i < conn->oc_dev->obd_multi_count; i++) {
88 rc = obd_open(&conn->oc_dev->obd_multi_conn[i], oa);
91 else if (retval != rc)
92 CERROR("different results on multiple OBDs!\n");
98 static int lov_close(struct lustre_handle *conn, struct obdo *oa)
103 if (!class_conn2export(conn))
106 for (i = 0; i < conn->oc_dev->obd_multi_count; i++) {
107 rc = obd_close(&conn->oc_dev->obd_multi_conn[i], oa);
110 else if (retval != rc)
111 CERROR("different results on multiple OBDs!\n");
117 static int lov_create(struct lustre_handle *conn, struct obdo *oa)
119 int rc, retval, i, offset;
124 if (!class_conn2export(conn))
127 md.lmd_object_id = oa->o_id;
128 md.lmd_stripe_count = conn->oc_dev->obd_multi_count;
130 memset(oa->o_inline, 0, sizeof(oa->o_inline));
132 for (i = 0; i < md.lmd_stripe_count; i++) {
133 struct lov_object_id lov_id;
134 rc = obd_create(&conn->oc_dev->obd_multi_conn[i], &tmp);
136 memcpy(oa, &tmp, sizeof(tmp));
138 } else if (retval != rc)
139 CERROR("return codes didn't match (%d, %d)\n",
141 lov_id = (struct lov_object_id *)(oa->o_inline + offset);
142 lov_id->l_device_id = i;
143 lov_id->l_object_id = tmp.o_id;
144 offset += sizeof(*lov_id);
146 memcpy(oa->o_inline, &md, sizeof(md));
151 static int lov_destroy(struct lustre_handle *conn, struct obdo *oa)
153 int rc, retval, i, offset;
156 struct lov_object_id lov_id;
159 if (!class_conn2export(conn))
162 md = (struct lov_md *)oa->o_inline;
164 memcpy(&tmp, oa, sizeof(tmp));
167 for (i = 0; i < md->lmd_stripe_count; i++) {
168 struct lov_object_id *lov_id;
169 lov_id = (struct lov_object_id *)(oa->o_inline + offset);
171 tmp.o_id = lov_id->l_object_id;
173 rc = obd_destroy(&conn->oc_dev->obd_multi_conn[i], &tmp);
176 else if (retval != rc)
177 CERROR("return codes didn't match (%d, %d)\n",
179 offset += sizeof(*lov_id);
185 /* FIXME: maybe we'll just make one node the authoritative attribute node, then
186 * we can send this 'punch' to just the authoritative node and the nodes
187 * that the punch will affect. */
188 static int lov_punch(struct lustre_handle *conn, struct obdo *oa,
189 obd_size count, obd_off offset)
194 if (!class_conn2export(conn))
197 for (i = 0; i < conn->oc_dev->obd_multi_count; i++) {
198 rc = obd_punch(&conn->oc_dev->obd_multi_conn[i], oa, count,
202 else if (retval != rc)
203 CERROR("different results on multiple OBDs!\n");
209 struct lov_callback_data {
211 wait_queue_head waitq;
214 static void lov_read_callback(struct ptlrpc_bulk_desc *desc, void *data)
216 struct lov_callback_data *cb_data = data;
218 if (atomic_dec_and_test(&cb_data->count))
219 wake_up(&cb_data->waitq);
222 static int lov_read_check_status(struct lov_callback_data *cb_data)
225 if (sigismember(&(current->pending.signal), SIGKILL) ||
226 sigismember(&(current->pending.signal), SIGTERM) ||
227 sigismember(&(current->pending.signal), SIGINT)) {
228 cb_data->flags |= PTL_RPC_FL_INTR;
231 if (atomic_read(&cb_data->count) == 0)
236 /* buffer must lie in user memory here */
237 static int lov_brw(int cmd, struct lustre_handle *conn, obd_count num_oa,
239 obd_count *oa_bufs, struct page **buf,
240 obd_size *count, obd_off *offset, obd_flag *flags,
241 bulk_callback_t callback, void *data)
243 int rc, i, page_array_offset = 0;
244 obd_off off = offset;
246 struct lov_callback_data *cb_data;
252 if (!class_conn2export(conn))
255 OBD_ALLOC(cb_data, sizeof(*cb_data));
256 if (cb_data == NULL) {
260 INIT_WAITQUEUE_HEAD(&cb_data->waitq);
261 atomic_set(&cb_data->count, 0);
263 for (i = 0; i < oa_bufs[0]; i++) {
264 struct page *current_page = buf[i];
266 struct lov_md *md = (struct lov_md *)oa[i]->inline;
267 int bufcount = oa_bufs[i];
268 // md->lmd_stripe_count
270 for (k = page_array_offset; k < bufcount + page_array_offset;
274 page_array_offset += bufcount;
277 while (off < offset + count) {
281 stripe = off / conn->oc_dev->u.lov.lov_stripe_size;
282 size = (stripe + 1) * conn->oc_dev->u.lov.lov_strip_size - off;
286 conn = stripe % conn->oc_dev->obd_multi_count;
289 atomic_inc(&cb_data->count);
290 rc = obd_brw(cmd, &conn->oc_dev->obd_multi_conn[conn],
292 &size, off, lov_read_callback, cb_data);
296 CERROR("read(off=%Lu, count=%Lu): %d\n",
297 (unsigned long long)off,
298 (unsigned long long)size, rc);
305 wait_event_interruptible(&cb_data->waitq,
306 lov_read_check_status(cb_data));
307 if (cb_data->flags & PTL_RPC_FL_INTR)
310 /* FIXME: The error handling here sucks */
312 OBD_FREE(cb_data, sizeof(*cb_data));
316 static void lov_write_finished(struct ptlrpc_bulk_desc *desc, void *data)
321 /* buffer must lie in user memory here */
322 static int filter_write(struct lustre_handle *conn, struct obdo *oa, char *buf,
323 obd_size *count, obd_off offset)
327 unsigned long retval;
330 if (!class_conn2export(conn)) {
331 CDEBUG(D_IOCTL, "invalid client %u\n", conn->oc_id);
336 file = filter_obj_open(conn->oc_dev, oa->o_id, oa->o_mode);
337 if (!file || IS_ERR(file)) {
339 return -PTR_ERR(file);
342 /* count doubles as retval */
343 retval = file->f_op->write(file, buf, *count, (loff_t *)&offset);
359 static int lov_enqueue(struct lustre_handle *conn, struct ldlm_namespace *ns,
360 struct ldlm_handle *parent_lock, __u64 *res_id,
361 __u32 type, struct ldlm_extent *extent, __u32 mode,
362 int *flags, void *data, int datalen,
363 struct ldlm_handle *lockh)
368 if (!class_conn2export(conn))
371 rc = obd_enqueue(&conn->oc_dev->obd_multi_conn[0], ns, parent_lock,
372 res_id, type, extent, mode, flags, data, datalen,
377 static int lov_cancel(struct lustre_handle *conn, __u32 mode,
378 struct ldlm_handle *lockh)
383 if (!class_conn2export(conn))
386 rc = obd_cancel(&conn->oc_dev->obd_multi_conn[0], oa);
391 struct obd_ops lov_obd_ops = {
392 o_setup: class_multi_setup,
393 o_cleanup: class_multi_cleanup,
394 o_create: lov_create,
395 o_connect: lov_connect,
396 o_disconnect: lov_disconnect,
397 o_getattr: lov_getattr,
398 o_setattr: lov_setattr,
402 o_destroy: lov_destroy,
403 o_brw: lov_pgcache_brw,
405 o_enqueue: lov_enqueue,
411 #define LOV_VERSION "v0.1"
413 static int __init lov_init(void)
415 printk(KERN_INFO "Lustre Logical Object Volume driver " LOV_VERSION
416 ", phil@clusterfs.com\n");
417 return class_register_type(&lov_obd_ops, OBD_LOV_DEVICENAME);
420 static void __exit lov_exit(void)
422 class_unregister_type(OBD_LOV_DEVICENAME);
425 MODULE_AUTHOR("Phil Schwan <phil@clusterfs.com>");
426 MODULE_DESCRIPTION("Lustre Logical Object Volume OBD driver v0.1");
427 MODULE_LICENSE("GPL");
429 module_init(lov_init);
430 module_exit(lov_exit);