Whamcloud - gitweb
cfbccecca28dc0270cbd7736a3c309c103dd6f29
[fs/lustre-release.git] / lustre / osc / osc_request.c
1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2  * vim:expandtab:shiftwidth=8:tabstop=8:
3  *
4  * Copyright (C) 2001, 2002 Cluster File Systems, Inc.
5  *
6  *  This code is issued under the GNU General Public License.
7  *  See the file COPYING in this distribution
8  *
9  *  Author Peter Braam <braam@clusterfs.com>
10  *
11  *  This server is single threaded at present (but can easily be multi
12  *  threaded). For testing and management it is treated as an
13  *  obd_device, although it does not export a full OBD method table
14  *  (the requests are coming in over the wire, so object target
15  *  modules do not have a full method table.)
16  *
17  */
18
19 #define EXPORT_SYMTAB
20 #define DEBUG_SUBSYSTEM S_OSC
21
22 #include <linux/module.h>
23 #include <linux/lustre_dlm.h>
24 #include <linux/lustre_mds.h> /* for mds_objid */
25 #include <linux/obd_ost.h>
26 #include <linux/obd_lov.h>
27 #include <linux/ctype.h>
28 #include <linux/init.h>
29 #include <linux/lustre_ha.h>
30 #include <linux/obd_support.h> /* for OBD_FAIL_CHECK */
31 #include <linux/lustre_lite.h> /* for ll_i2info */
32
33 static int osc_getattr(struct lustre_handle *conn, struct obdo *oa, 
34                        struct lov_stripe_md *md)
35 {
36         struct ptlrpc_request *request;
37         struct ost_body *body;
38         int rc, size = sizeof(*body);
39         ENTRY;
40
41         request = ptlrpc_prep_req2(conn, OST_GETATTR, 1, &size, NULL);
42         if (!request)
43                 RETURN(-ENOMEM);
44
45         body = lustre_msg_buf(request->rq_reqmsg, 0);
46 #warning FIXME: pack only valid fields instead of memcpy, endianness
47         memcpy(&body->oa, oa, sizeof(*oa));
48
49         request->rq_replen = lustre_msg_size(1, &size);
50
51         rc = ptlrpc_queue_wait(request);
52         rc = ptlrpc_check_status(request, rc);
53         if (rc) {
54                 CERROR("%s failed: rc = %d\n", __FUNCTION__, rc);
55                 GOTO(out, rc);
56         }
57
58         body = lustre_msg_buf(request->rq_repmsg, 0);
59         CDEBUG(D_INODE, "mode: %o\n", body->oa.o_mode);
60         if (oa)
61                 memcpy(oa, &body->oa, sizeof(*oa));
62
63         EXIT;
64  out:
65         ptlrpc_free_req(request);
66         return rc;
67 }
68
69 static int osc_open(struct lustre_handle *conn, struct obdo *oa,
70                     struct lov_stripe_md *md)
71 {
72         struct ptlrpc_request *request;
73         struct ost_body *body;
74         int rc, size = sizeof(*body);
75         ENTRY;
76
77         request = ptlrpc_prep_req2(conn, OST_OPEN, 1, &size, NULL);
78         if (!request)
79                 RETURN(-ENOMEM);
80
81         body = lustre_msg_buf(request->rq_reqmsg, 0);
82 #warning FIXME: pack only valid fields instead of memcpy, endianness
83         memcpy(&body->oa, oa, sizeof(*oa));
84
85         request->rq_replen = lustre_msg_size(1, &size);
86
87         rc = ptlrpc_queue_wait(request);
88         rc = ptlrpc_check_status(request, rc);
89         if (rc)
90                 GOTO(out, rc);
91
92         body = lustre_msg_buf(request->rq_repmsg, 0);
93         CDEBUG(D_INODE, "mode: %o\n", body->oa.o_mode);
94         if (oa)
95                 memcpy(oa, &body->oa, sizeof(*oa));
96
97         EXIT;
98  out:
99         ptlrpc_free_req(request);
100         return rc;
101 }
102
103 static int osc_close(struct lustre_handle *conn, struct obdo *oa,
104                      struct lov_stripe_md *md)
105 {
106         struct ptlrpc_request *request;
107         struct ost_body *body;
108         int rc, size = sizeof(*body);
109         ENTRY;
110
111         request = ptlrpc_prep_req2(conn, OST_CLOSE, 1, &size, NULL);
112         if (!request)
113                 RETURN(-ENOMEM);
114
115         body = lustre_msg_buf(request->rq_reqmsg, 0);
116 #warning FIXME: pack only valid fields instead of memcpy, endianness
117         memcpy(&body->oa, oa, sizeof(*oa));
118
119         request->rq_replen = lustre_msg_size(1, &size);
120
121         rc = ptlrpc_queue_wait(request);
122         rc = ptlrpc_check_status(request, rc);
123         if (rc)
124                 GOTO(out, rc);
125
126         body = lustre_msg_buf(request->rq_repmsg, 0);
127         CDEBUG(D_INODE, "mode: %o\n", body->oa.o_mode);
128         if (oa)
129                 memcpy(oa, &body->oa, sizeof(*oa));
130
131         EXIT;
132  out:
133         ptlrpc_free_req(request);
134         return rc;
135 }
136
137 static int osc_setattr(struct lustre_handle *conn, struct obdo *oa,
138                        struct lov_stripe_md *md)
139 {
140         struct ptlrpc_request *request;
141         struct ost_body *body;
142         int rc, size = sizeof(*body);
143         ENTRY;
144
145         request = ptlrpc_prep_req2(conn, OST_SETATTR, 1, &size, NULL);
146         if (!request)
147                 RETURN(-ENOMEM);
148
149         body = lustre_msg_buf(request->rq_reqmsg, 0);
150         memcpy(&body->oa, oa, sizeof(*oa));
151
152         request->rq_replen = lustre_msg_size(1, &size);
153
154         rc = ptlrpc_queue_wait(request);
155         rc = ptlrpc_check_status(request, rc);
156         GOTO(out, rc);
157
158  out:
159         ptlrpc_free_req(request);
160         return rc;
161 }
162
163 static int osc_create(struct lustre_handle *conn, struct obdo *oa,
164                       struct lov_stripe_md **ea)
165 {
166         struct ptlrpc_request *request;
167         struct ost_body *body;
168         int rc, size = sizeof(*body);
169         ENTRY;
170
171         if (!oa) {
172                 CERROR("oa NULL\n");
173                 RETURN(-EINVAL);
174         }
175
176         if (!ea) {
177                 LBUG();
178         }
179
180         if (!*ea) {
181                 OBD_ALLOC(*ea, oa->o_easize);
182                 if (!*ea)
183                         RETURN(-ENOMEM);
184                 (*ea)->lmd_easize = oa->o_easize;
185         }
186
187         request = ptlrpc_prep_req2(conn, OST_CREATE, 1, &size, NULL);
188         if (!request)
189                 RETURN(-ENOMEM);
190
191         body = lustre_msg_buf(request->rq_reqmsg, 0);
192         memcpy(&body->oa, oa, sizeof(*oa));
193
194         request->rq_replen = lustre_msg_size(1, &size);
195
196         rc = ptlrpc_queue_wait(request);
197         rc = ptlrpc_check_status(request, rc);
198         if (rc)
199                 GOTO(out, rc);
200
201         body = lustre_msg_buf(request->rq_repmsg, 0);
202         memcpy(oa, &body->oa, sizeof(*oa));
203
204         (*ea)->lmd_object_id = oa->o_id;
205         (*ea)->lmd_stripe_count = 1;
206         EXIT;
207  out:
208         ptlrpc_free_req(request);
209         return rc;
210 }
211
212 static int osc_punch(struct lustre_handle *conn, struct obdo *oa,
213                      struct lov_stripe_md *md, obd_size start,
214                      obd_size end)
215 {
216         struct ptlrpc_request *request;
217         struct ost_body *body;
218         int rc, size = sizeof(*body);
219         ENTRY;
220
221         if (!oa) {
222                 CERROR("oa NULL\n");
223                 RETURN(-EINVAL);
224         }
225
226         request = ptlrpc_prep_req2(conn, OST_PUNCH, 1, &size, NULL);
227         if (!request)
228                 RETURN(-ENOMEM);
229
230         body = lustre_msg_buf(request->rq_reqmsg, 0);
231 #warning FIXME: pack only valid fields instead of memcpy, endianness, valid
232         memcpy(&body->oa, oa, sizeof(*oa));
233
234         /* overload the blocks and size fields in the oa with start/end */ 
235 #warning FIXME: endianness, size=start, blocks=end?
236         body->oa.o_blocks = start;
237         body->oa.o_size = end;
238         body->oa.o_valid |= OBD_MD_FLBLOCKS | OBD_MD_FLSIZE;
239
240         request->rq_replen = lustre_msg_size(1, &size);
241
242         rc = ptlrpc_queue_wait(request);
243         rc = ptlrpc_check_status(request, rc);
244         if (rc)
245                 GOTO(out, rc);
246
247         body = lustre_msg_buf(request->rq_repmsg, 0);
248         memcpy(oa, &body->oa, sizeof(*oa));
249
250         EXIT;
251  out:
252         ptlrpc_free_req(request);
253         return rc;
254 }
255
256 static int osc_destroy(struct lustre_handle *conn, struct obdo *oa,
257                        struct lov_stripe_md *ea)
258 {
259         struct ptlrpc_request *request;
260         struct ost_body *body;
261         int rc, size = sizeof(*body);
262         ENTRY;
263
264         if (!oa) {
265                 CERROR("oa NULL\n");
266                 RETURN(-EINVAL);
267         }
268         request = ptlrpc_prep_req2(conn, OST_DESTROY, 1, &size, NULL);
269         if (!request)
270                 RETURN(-ENOMEM);
271
272         body = lustre_msg_buf(request->rq_reqmsg, 0);
273 #warning FIXME: pack only valid fields instead of memcpy, endianness
274         memcpy(&body->oa, oa, sizeof(*oa));
275
276         request->rq_replen = lustre_msg_size(1, &size);
277
278         rc = ptlrpc_queue_wait(request);
279         rc = ptlrpc_check_status(request, rc);
280         if (rc)
281                 GOTO(out, rc);
282
283         body = lustre_msg_buf(request->rq_repmsg, 0);
284         memcpy(oa, &body->oa, sizeof(*oa));
285
286         EXIT;
287  out:
288         ptlrpc_free_req(request);
289         return rc;
290 }
291
292 struct osc_brw_cb_data {
293         brw_callback_t callback;
294         void *cb_data;
295         void *obd_data;
296         size_t obd_size;
297 };
298
299 /* Our bulk-unmapping bottom half. */
300 static void unmap_and_decref_bulk_desc(void *data)
301 {
302         struct ptlrpc_bulk_desc *desc = data;
303         struct list_head *tmp;
304         ENTRY;
305
306         /* This feels wrong to me. */
307         list_for_each(tmp, &desc->b_page_list) {
308                 struct ptlrpc_bulk_page *bulk;
309                 bulk = list_entry(tmp, struct ptlrpc_bulk_page, b_link);
310
311                 kunmap(bulk->b_page);
312         }
313
314         ptlrpc_bulk_decref(desc);
315         EXIT;
316 }
317
318 static void brw_finish(struct ptlrpc_bulk_desc *desc, void *data)
319 {
320         struct osc_brw_cb_data *cb_data = data;
321         int err = 0;
322         ENTRY;
323
324         if (desc->b_flags & PTL_RPC_FL_TIMEOUT) {
325                 err = (desc->b_flags & PTL_RPC_FL_INTR ? -ERESTARTSYS : 
326                        -ETIMEDOUT);
327         }
328
329         if (cb_data->callback)
330                 cb_data->callback(cb_data->cb_data, err, CB_PHASE_FINISH);
331
332         OBD_FREE(cb_data->obd_data, cb_data->obd_size);
333         OBD_FREE(cb_data, sizeof(*cb_data));
334
335         /* We can't kunmap the desc from interrupt context, so we do it from
336          * the bottom half above. */
337         INIT_TQUEUE(&desc->b_queue, 0, 0);
338         PREPARE_TQUEUE(&desc->b_queue, unmap_and_decref_bulk_desc, desc);
339         schedule_task(&desc->b_queue);
340
341         EXIT;
342 }
343
344 static int osc_brw_read(struct lustre_handle *conn, struct lov_stripe_md *md,
345                         obd_count page_count, struct brw_page *pga,
346                         brw_callback_t callback, struct io_cb_data *data)
347 {
348         struct ptlrpc_connection *connection = client_conn2cli(conn)->cl_conn;
349         struct ptlrpc_request *request = NULL;
350         struct ptlrpc_bulk_desc *desc = NULL;
351         struct ost_body *body;
352         struct osc_brw_cb_data *cb_data = NULL;
353         int rc, size[3] = {sizeof(*body)};
354         void *iooptr, *nioptr;
355         int mapped = 0;
356         __u32 xid;
357         ENTRY;
358
359         size[1] = sizeof(struct obd_ioobj);
360         size[2] = page_count * sizeof(struct niobuf_remote);
361
362         request = ptlrpc_prep_req2(conn, OST_READ, 3, size, NULL);
363         if (!request)
364                 RETURN(-ENOMEM);
365
366         body = lustre_msg_buf(request->rq_reqmsg, 0);
367
368         desc = ptlrpc_prep_bulk(connection);
369         if (!desc)
370                 GOTO(out_req, rc = -ENOMEM);
371         desc->b_portal = OST_BULK_PORTAL;
372         desc->b_cb = brw_finish;
373         OBD_ALLOC(cb_data, sizeof(*cb_data));
374         if (!cb_data)
375                 GOTO(out_desc, rc = -ENOMEM);
376
377         cb_data->callback = callback;
378         cb_data->cb_data = data;
379         data->desc = desc;
380         desc->b_cb_data = cb_data;
381
382         iooptr = lustre_msg_buf(request->rq_reqmsg, 1);
383         nioptr = lustre_msg_buf(request->rq_reqmsg, 2);
384         ost_pack_ioo(&iooptr, md, page_count);
385         /* end almost identical to brw_write case */
386
387         spin_lock(&connection->c_lock);
388         xid = ++connection->c_xid_out;       /* single xid for all pages */
389         spin_unlock(&connection->c_lock);
390
391         for (mapped = 0; mapped < page_count; mapped++) {
392                 struct ptlrpc_bulk_page *bulk = ptlrpc_prep_bulk_page(desc);
393                 if (bulk == NULL)
394                         GOTO(out_unmap, rc = -ENOMEM);
395
396                 bulk->b_xid = xid;           /* single xid for all pages */
397
398                 bulk->b_buf = kmap(pga[mapped].pg);
399                 bulk->b_page = pga[mapped].pg;
400                 bulk->b_buflen = PAGE_SIZE;
401                 ost_pack_niobuf(&nioptr, pga[mapped].off, pga[mapped].count,
402                                 pga[mapped].flag, bulk->b_xid);
403         }
404
405         /*
406          * Register the bulk first, because the reply could arrive out of order,
407          * and we want to be ready for the bulk data.
408          *
409          * The reference is released when brw_finish is complete.
410          *
411          * On error, we never do the brw_finish, so we handle all decrefs.
412          */
413         if (OBD_FAIL_CHECK(OBD_FAIL_OSC_BRW_READ_BULK)) {
414                 CERROR("obd_fail_loc=%x, skipping register_bulk\n",
415                        OBD_FAIL_OSC_BRW_READ_BULK);
416         } else {
417                 rc = ptlrpc_register_bulk(desc);
418                 if (rc)
419                         GOTO(out_unmap, rc);
420         }
421
422         request->rq_replen = lustre_msg_size(1, size);
423         rc = ptlrpc_queue_wait(request);
424         rc = ptlrpc_check_status(request, rc);
425
426         /*
427          * XXX: If there is an error during the processing of the callback,
428          *      such as a timeout in a sleep that it performs, brw_finish
429          *      will never get called, and we'll leak the desc, fail to kunmap
430          *      things, cats will live with dogs.  One solution would be to
431          *      export brw_finish as osc_brw_finish, so that the timeout case and
432          *      its kin could call it for proper cleanup.  An alternative would
433          *      be for an error return from the callback to cause us to clean up,
434          *      but that doesn't help the truly async cases (like LOV), which
435          *      will immediately return from their PHASE_START callback, before
436          *      any such cleanup-requiring error condition can be detected.
437          */
438         if (rc)
439                 GOTO(out_req, rc);
440
441         /* Callbacks cause asynchronous handling. */
442         rc = callback(data, 0, CB_PHASE_START); 
443
444 out_req:
445         ptlrpc_req_finished(request);
446         RETURN(rc);
447
448         /* Clean up on error. */
449 out_unmap:
450         while (mapped-- > 0)
451                 kunmap(pga[mapped].pg);
452         OBD_FREE(cb_data, sizeof(*cb_data));
453 out_desc:
454         ptlrpc_bulk_decref(desc);
455         goto out_req;
456 }
457
458 static int osc_brw_write(struct lustre_handle *conn, struct lov_stripe_md *md,
459                          obd_count page_count, struct brw_page *pga,
460                          brw_callback_t callback, struct io_cb_data *data)
461 {
462         struct ptlrpc_connection *connection = client_conn2cli(conn)->cl_conn;
463         struct ptlrpc_request *request = NULL;
464         struct ptlrpc_bulk_desc *desc = NULL;
465         struct ost_body *body;
466         struct niobuf_local *local = NULL;
467         struct niobuf_remote *remote;
468         struct osc_brw_cb_data *cb_data = NULL;
469         int rc, j, size[3] = {sizeof(*body)};
470         void *iooptr, *nioptr;
471         int mapped = 0;
472         ENTRY;
473
474         size[1] = sizeof(struct obd_ioobj);
475         size[2] = page_count * sizeof(*remote);
476
477         request = ptlrpc_prep_req2(conn, OST_WRITE, 3, size, NULL);
478         if (!request)
479                 RETURN(-ENOMEM);
480
481         body = lustre_msg_buf(request->rq_reqmsg, 0);
482
483         desc = ptlrpc_prep_bulk(connection);
484         if (!desc)
485                 GOTO(out_req, rc = -ENOMEM);
486         desc->b_portal = OSC_BULK_PORTAL;
487         desc->b_cb = brw_finish;
488         OBD_ALLOC(cb_data, sizeof(*cb_data));
489         if (!cb_data)
490                 GOTO(out_desc, rc = -ENOMEM);
491
492         cb_data->callback = callback;
493         cb_data->cb_data = data;
494         data->desc = desc;
495         desc->b_cb_data = cb_data;
496
497         iooptr = lustre_msg_buf(request->rq_reqmsg, 1);
498         nioptr = lustre_msg_buf(request->rq_reqmsg, 2);
499         ost_pack_ioo(&iooptr, md, page_count);
500         /* end almost identical to brw_read case */
501
502         OBD_ALLOC(local, page_count * sizeof(*local));
503         if (!local)
504                 GOTO(out_cb, rc = -ENOMEM);
505
506         cb_data->obd_data = local;
507         cb_data->obd_size = page_count * sizeof(*local);
508
509         for (mapped = 0; mapped < page_count; mapped++) {
510                 local[mapped].addr = kmap(pga[mapped].pg);
511                 local[mapped].offset = pga[mapped].off;
512                 local[mapped].len = pga[mapped].count;
513                 ost_pack_niobuf(&nioptr, pga[mapped].off, pga[mapped].count,
514                                 pga[mapped].flag, 0);
515         }
516
517         size[1] = page_count * sizeof(*remote);
518         request->rq_replen = lustre_msg_size(2, size);
519         rc = ptlrpc_queue_wait(request);
520         rc = ptlrpc_check_status(request, rc);
521         if (rc)
522                 GOTO(out_unmap, rc);
523
524         nioptr = lustre_msg_buf(request->rq_repmsg, 1);
525         if (!nioptr)
526                 GOTO(out_unmap, rc = -EINVAL);
527
528         if (request->rq_repmsg->buflens[1] != size[1]) {
529                 CERROR("buffer length wrong (%d vs. %d)\n",
530                        request->rq_repmsg->buflens[1], size[1]);
531                 GOTO(out_unmap, rc = -EINVAL);
532         }
533
534         for (j = 0; j < page_count; j++) {
535                 struct ptlrpc_bulk_page *bulk;
536
537                 ost_unpack_niobuf(&nioptr, &remote);
538
539                 bulk = ptlrpc_prep_bulk_page(desc);
540                 if (!bulk)
541                         GOTO(out_unmap, rc = -ENOMEM);
542
543                 bulk->b_buf = (void *)(unsigned long)local[j].addr;
544                 bulk->b_buflen = local[j].len;
545                 bulk->b_xid = remote->xid;
546                 bulk->b_page = pga[j].pg;
547         }
548
549         if (desc->b_page_count != page_count)
550                 LBUG();
551
552         if (OBD_FAIL_CHECK(OBD_FAIL_OSC_BRW_WRITE_BULK))
553                 GOTO(out_unmap, rc = 0);
554
555         /* Our reference is released when brw_finish is complete. */
556         rc = ptlrpc_send_bulk(desc);
557
558         /* XXX: Mike, same question as in osc_brw_read. */
559         if (rc)
560                 GOTO(out_req, rc);
561
562         /* Callbacks cause asynchronous handling. */
563         rc = callback(data, 0, CB_PHASE_START);
564
565 out_req:
566         ptlrpc_req_finished(request);
567         RETURN(rc);
568
569         /* Clean up on error. */
570 out_unmap:
571         while (mapped-- > 0)
572                 kunmap(pga[mapped].pg);
573
574         OBD_FREE(local, page_count * sizeof(*local));
575 out_cb:
576         OBD_FREE(cb_data, sizeof(*cb_data));
577 out_desc:
578         ptlrpc_bulk_decref(desc);
579         goto out_req;
580 }
581
582 static int osc_brw(int cmd, struct lustre_handle *conn,
583                    struct lov_stripe_md *md, obd_count page_count,
584                    struct brw_page *pga, brw_callback_t callback,
585                    struct io_cb_data *data)
586 {
587         if (cmd & OBD_BRW_WRITE)
588                 return osc_brw_write(conn, md, page_count, pga, callback, data);
589         else
590                 return osc_brw_read(conn, md, page_count, pga, callback, data);
591 }
592
593 static int osc_enqueue(struct lustre_handle *connh, struct lov_stripe_md *md, 
594                        struct lustre_handle *parent_lock, 
595                        __u32 type, void *extentp, int extent_len, __u32 mode,
596                        int *flags, void *callback, void *data, int datalen,
597                        struct lustre_handle *lockh)
598 {
599         __u64 res_id[RES_NAME_SIZE] = { md->lmd_object_id };
600         struct obd_device *obddev = class_conn2obd(connh);
601         struct ldlm_extent *extent = extentp;
602         int rc;
603         __u32 mode2;
604
605         /* Filesystem locks are given a bit of special treatment: first we
606          * fixup the lock to start and end on page boundaries. */
607         extent->start &= PAGE_MASK;
608         extent->end = (extent->end + PAGE_SIZE - 1) & PAGE_MASK;
609
610         /* Next, search for already existing extent locks that will cover us */
611         //osc_con2dlmcl(conn, &cl, &connection, &rconn);
612         rc = ldlm_lock_match(obddev->obd_namespace, res_id, type, extent,
613                              sizeof(extent), mode, lockh);
614         if (rc == 1) {
615                 /* We already have a lock, and it's referenced */
616                 return 0;
617         }
618
619         /* Next, search for locks that we can upgrade (if we're trying to write)
620          * or are more than we need (if we're trying to read).  Because the VFS
621          * and page cache already protect us locally, lots of readers/writers
622          * can share a single PW lock. */
623         if (mode == LCK_PW)
624                 mode2 = LCK_PR;
625         else
626                 mode2 = LCK_PW;
627
628         rc = ldlm_lock_match(obddev->obd_namespace, res_id, type, extent,
629                              sizeof(extent), mode2, lockh);
630         if (rc == 1) {
631                 int flags;
632                 /* FIXME: This is not incredibly elegant, but it might
633                  * be more elegant than adding another parameter to
634                  * lock_match.  I want a second opinion. */
635                 ldlm_lock_addref(lockh, mode);
636                 ldlm_lock_decref(lockh, mode2);
637
638                 if (mode == LCK_PR)
639                         return 0;
640
641                 rc = ldlm_cli_convert(lockh, mode, &flags);
642                 if (rc)
643                         LBUG();
644
645                 return rc;
646         }
647
648         rc = ldlm_cli_enqueue(connh, NULL, obddev->obd_namespace,
649                               parent_lock, res_id, type, extent,
650                               sizeof(extent), mode, flags, ldlm_completion_ast,
651                               callback, data, datalen, lockh);
652         return rc;
653 }
654
655 static int osc_cancel(struct lustre_handle *oconn, struct lov_stripe_md *md,
656                       __u32 mode, struct lustre_handle *lockh)
657 {
658         ENTRY;
659
660         ldlm_lock_decref(lockh, mode);
661
662         RETURN(0);
663 }
664
665 static int osc_statfs(struct lustre_handle *conn, struct statfs *sfs)
666 {
667         struct ptlrpc_request *request;
668         struct obd_statfs *osfs;
669         int rc, size = sizeof(*osfs);
670         ENTRY;
671
672         request = ptlrpc_prep_req2(conn, OST_STATFS, 0, NULL, NULL);
673         if (!request)
674                 RETURN(-ENOMEM);
675
676         request->rq_replen = lustre_msg_size(1, &size);
677
678         rc = ptlrpc_queue_wait(request);
679         rc = ptlrpc_check_status(request, rc);
680         if (rc) {
681                 CERROR("%s failed: rc = %d\n", __FUNCTION__, rc);
682                 GOTO(out, rc);
683         }
684
685         osfs = lustre_msg_buf(request->rq_repmsg, 0);
686         obd_statfs_unpack(osfs, sfs);
687
688         EXIT;
689  out:
690         ptlrpc_free_req(request);
691         return rc;
692 }
693
694 static int osc_iocontrol(long cmd, struct lustre_handle *conn, int len,
695                          void *karg, void *uarg)
696 {
697         struct obd_device *obddev = class_conn2obd(conn);
698         struct obd_ioctl_data *data = karg;
699         int err = 0;
700         ENTRY;
701
702         if (_IOC_TYPE(cmd) != IOC_LDLM_TYPE || _IOC_NR(cmd) < 
703                         IOC_LDLM_MIN_NR || _IOC_NR(cmd) > IOC_LDLM_MAX_NR) {
704                 CDEBUG(D_IOCTL, "invalid ioctl (type %ld, nr %ld, size %ld)\n",
705                         _IOC_TYPE(cmd), _IOC_NR(cmd), _IOC_SIZE(cmd));
706                 RETURN(-EINVAL);
707         }
708
709         switch (cmd) {
710         case IOC_LDLM_TEST: {
711                 err = ldlm_test(obddev, conn);
712                 CERROR("-- done err %d\n", err);
713                 GOTO(out, err);
714         }
715         case IOC_LDLM_REGRESS_START: {
716                 unsigned int numthreads = 1; 
717                 unsigned int numheld = 10; 
718                 unsigned int numres = 10; 
719                 unsigned int numext = 10;
720                 char *parse;
721                 
722                 if (data->ioc_inllen1) {
723                         parse = data->ioc_inlbuf1;
724                         if (*parse != '\0') {
725                                 while(isspace(*parse)) parse++;
726                                 numthreads = simple_strtoul(parse, &parse, 0);
727                                 while(isspace(*parse)) parse++;
728                         }
729                         if (*parse != '\0') {
730                                 while(isspace(*parse)) parse++;
731                                 numheld = simple_strtoul(parse, &parse, 0);
732                                 while(isspace(*parse)) parse++;
733                         }
734                         if (*parse != '\0') {
735                                 while(isspace(*parse)) parse++;
736                                 numres = simple_strtoul(parse, &parse, 0);
737                                 while(isspace(*parse)) parse++;
738                         }
739                         if (*parse != '\0') {
740                                 while(isspace(*parse)) parse++;
741                                 numext = simple_strtoul(parse, &parse, 0);
742                                 while(isspace(*parse)) parse++;
743                         }
744                 }
745
746                 err = ldlm_regression_start(obddev, conn, numthreads, 
747                                 numheld, numres, numext);
748
749                 CERROR("-- done err %d\n", err);
750                 GOTO(out, err);
751         }
752         case IOC_LDLM_REGRESS_STOP: {
753                 err = ldlm_regression_stop();
754                 CERROR("-- done err %d\n", err);
755                 GOTO(out, err);
756         }
757         default:
758                 GOTO(out, err = -EINVAL);
759         }
760 out:
761         return err;
762 }
763
764 struct obd_ops osc_obd_ops = {
765         o_setup:        client_obd_setup,
766         o_cleanup:      client_obd_cleanup,
767         o_statfs:       osc_statfs,
768         o_create:       osc_create,
769         o_destroy:      osc_destroy,
770         o_getattr:      osc_getattr,
771         o_setattr:      osc_setattr,
772         o_open:         osc_open,
773         o_close:        osc_close,
774         o_connect:      client_obd_connect,
775         o_disconnect:   client_obd_disconnect,
776         o_brw:          osc_brw,
777         o_punch:        osc_punch,
778         o_enqueue:      osc_enqueue,
779         o_cancel:       osc_cancel,
780         o_iocontrol:    osc_iocontrol
781 };
782
783 static int __init osc_init(void)
784 {
785         return class_register_type(&osc_obd_ops, LUSTRE_OSC_NAME);
786 }
787
788 static void __exit osc_exit(void)
789 {
790         class_unregister_type(LUSTRE_OSC_NAME);
791 }
792
793 MODULE_AUTHOR("Cluster File Systems, Inc. <info@clusterfs.com>");
794 MODULE_DESCRIPTION("Lustre Object Storage Client (OSC) v1.0");
795 MODULE_LICENSE("GPL");
796
797 module_init(osc_init);
798 module_exit(osc_exit);