Whamcloud - gitweb
* l_wait_event can now do interrupts without a timeout, if we're feeling brave.
[fs/lustre-release.git] / lustre / osc / osc_request.c
1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2  * vim:expandtab:shiftwidth=8:tabstop=8:
3  *
4  * Copyright (C) 2001, 2002 Cluster File Systems, Inc.
5  *
6  *  This code is issued under the GNU General Public License.
7  *  See the file COPYING in this distribution
8  *
9  *  Author Peter Braam <braam@clusterfs.com>
10  *
11  *  This server is single threaded at present (but can easily be multi
12  *  threaded). For testing and management it is treated as an
13  *  obd_device, although it does not export a full OBD method table
14  *  (the requests are coming in over the wire, so object target
15  *  modules do not have a full method table.)
16  *
17  */
18
19 #define EXPORT_SYMTAB
20 #define DEBUG_SUBSYSTEM S_OSC
21
22 #include <linux/module.h>
23 #include <linux/lustre_dlm.h>
24 #include <linux/lustre_mds.h> /* for mds_objid */
25 #include <linux/obd_ost.h>
26 #include <linux/obd_lov.h>
27 #include <linux/init.h>
28 #include <linux/lustre_ha.h>
29 #include <linux/obd_support.h> /* for OBD_FAIL_CHECK */
30
31 static int osc_getattr(struct lustre_handle *conn, struct obdo *oa, 
32                        struct lov_stripe_md *md)
33 {
34         struct ptlrpc_request *request;
35         struct ost_body *body;
36         int rc, size = sizeof(*body);
37         ENTRY;
38
39         request = ptlrpc_prep_req2(conn, OST_GETATTR, 1, &size, NULL);
40         if (!request)
41                 RETURN(-ENOMEM);
42
43         body = lustre_msg_buf(request->rq_reqmsg, 0);
44 #warning FIXME: pack only valid fields instead of memcpy, endianness
45         memcpy(&body->oa, oa, sizeof(*oa));
46
47         request->rq_replen = lustre_msg_size(1, &size);
48
49         rc = ptlrpc_queue_wait(request);
50         rc = ptlrpc_check_status(request, rc);
51         if (rc) {
52                 CERROR("%s failed: rc = %d\n", __FUNCTION__, rc);
53                 GOTO(out, rc);
54         }
55
56         body = lustre_msg_buf(request->rq_repmsg, 0);
57         CDEBUG(D_INODE, "mode: %o\n", body->oa.o_mode);
58         if (oa)
59                 memcpy(oa, &body->oa, sizeof(*oa));
60
61         EXIT;
62  out:
63         ptlrpc_free_req(request);
64         return rc;
65 }
66
67 static int osc_open(struct lustre_handle *conn, struct obdo *oa,
68                     struct lov_stripe_md *md)
69 {
70         struct ptlrpc_request *request;
71         struct ost_body *body;
72         int rc, size = sizeof(*body);
73         ENTRY;
74
75         request = ptlrpc_prep_req2(conn, OST_OPEN, 1, &size, NULL);
76         if (!request)
77                 RETURN(-ENOMEM);
78
79         body = lustre_msg_buf(request->rq_reqmsg, 0);
80 #warning FIXME: pack only valid fields instead of memcpy, endianness
81         memcpy(&body->oa, oa, sizeof(*oa));
82
83         request->rq_replen = lustre_msg_size(1, &size);
84
85         rc = ptlrpc_queue_wait(request);
86         rc = ptlrpc_check_status(request, rc);
87         if (rc)
88                 GOTO(out, rc);
89
90         body = lustre_msg_buf(request->rq_repmsg, 0);
91         CDEBUG(D_INODE, "mode: %o\n", body->oa.o_mode);
92         if (oa)
93                 memcpy(oa, &body->oa, sizeof(*oa));
94
95         EXIT;
96  out:
97         ptlrpc_free_req(request);
98         return rc;
99 }
100
101 static int osc_close(struct lustre_handle *conn, struct obdo *oa,
102                      struct lov_stripe_md *md)
103 {
104         struct ptlrpc_request *request;
105         struct ost_body *body;
106         int rc, size = sizeof(*body);
107         ENTRY;
108
109         request = ptlrpc_prep_req2(conn, OST_CLOSE, 1, &size, NULL);
110         if (!request)
111                 RETURN(-ENOMEM);
112
113         body = lustre_msg_buf(request->rq_reqmsg, 0);
114 #warning FIXME: pack only valid fields instead of memcpy, endianness
115         memcpy(&body->oa, oa, sizeof(*oa));
116
117         request->rq_replen = lustre_msg_size(1, &size);
118
119         rc = ptlrpc_queue_wait(request);
120         rc = ptlrpc_check_status(request, rc);
121         if (rc)
122                 GOTO(out, rc);
123
124         body = lustre_msg_buf(request->rq_repmsg, 0);
125         CDEBUG(D_INODE, "mode: %o\n", body->oa.o_mode);
126         if (oa)
127                 memcpy(oa, &body->oa, sizeof(*oa));
128
129         EXIT;
130  out:
131         ptlrpc_free_req(request);
132         return rc;
133 }
134
135 static int osc_setattr(struct lustre_handle *conn, struct obdo *oa,
136                        struct lov_stripe_md *md)
137 {
138         struct ptlrpc_request *request;
139         struct ost_body *body;
140         int rc, size = sizeof(*body);
141         ENTRY;
142
143         request = ptlrpc_prep_req2(conn, OST_SETATTR, 1, &size, NULL);
144         if (!request)
145                 RETURN(-ENOMEM);
146
147         body = lustre_msg_buf(request->rq_reqmsg, 0);
148         memcpy(&body->oa, oa, sizeof(*oa));
149
150         request->rq_replen = lustre_msg_size(1, &size);
151
152         rc = ptlrpc_queue_wait(request);
153         rc = ptlrpc_check_status(request, rc);
154         GOTO(out, rc);
155
156  out:
157         ptlrpc_free_req(request);
158         return rc;
159 }
160
161 static int osc_create(struct lustre_handle *conn, struct obdo *oa,
162                       struct lov_stripe_md **ea)
163 {
164         struct ptlrpc_request *request;
165         struct ost_body *body;
166         int rc, size = sizeof(*body);
167         ENTRY;
168
169         if (!oa) {
170                 CERROR("oa NULL\n");
171                 RETURN(-EINVAL);
172         }
173
174         if (!ea) {
175                 LBUG();
176         }
177
178         if (!*ea) {
179                 OBD_ALLOC(*ea, oa->o_easize);
180                 if (!*ea)
181                         RETURN(-ENOMEM);
182                 (*ea)->lmd_easize = oa->o_easize;
183         }
184
185         request = ptlrpc_prep_req2(conn, OST_CREATE, 1, &size, NULL);
186         if (!request)
187                 RETURN(-ENOMEM);
188
189         body = lustre_msg_buf(request->rq_reqmsg, 0);
190         memcpy(&body->oa, oa, sizeof(*oa));
191
192         request->rq_replen = lustre_msg_size(1, &size);
193
194         rc = ptlrpc_queue_wait(request);
195         rc = ptlrpc_check_status(request, rc);
196         if (rc)
197                 GOTO(out, rc);
198
199         body = lustre_msg_buf(request->rq_repmsg, 0);
200         memcpy(oa, &body->oa, sizeof(*oa));
201
202         (*ea)->lmd_object_id = oa->o_id;
203         (*ea)->lmd_stripe_count = 1;
204         EXIT;
205  out:
206         ptlrpc_free_req(request);
207         return rc;
208 }
209
210 static int osc_punch(struct lustre_handle *conn, struct obdo *oa,
211                      struct lov_stripe_md *md, obd_size start,
212                      obd_size end)
213 {
214         struct ptlrpc_request *request;
215         struct ost_body *body;
216         int rc, size = sizeof(*body);
217         ENTRY;
218
219         if (!oa) {
220                 CERROR("oa NULL\n");
221                 RETURN(-EINVAL);
222         }
223
224         request = ptlrpc_prep_req2(conn, OST_PUNCH, 1, &size, NULL);
225         if (!request)
226                 RETURN(-ENOMEM);
227
228         body = lustre_msg_buf(request->rq_reqmsg, 0);
229 #warning FIXME: pack only valid fields instead of memcpy, endianness, valid
230         memcpy(&body->oa, oa, sizeof(*oa));
231
232         /* overload the blocks and size fields in the oa with start/end */ 
233 #warning FIXME: endianness, size=start, blocks=end?
234         body->oa.o_blocks = start;
235         body->oa.o_size = end;
236         body->oa.o_valid |= OBD_MD_FLBLOCKS | OBD_MD_FLSIZE;
237
238         request->rq_replen = lustre_msg_size(1, &size);
239
240         rc = ptlrpc_queue_wait(request);
241         rc = ptlrpc_check_status(request, rc);
242         if (rc)
243                 GOTO(out, rc);
244
245         body = lustre_msg_buf(request->rq_repmsg, 0);
246         memcpy(oa, &body->oa, sizeof(*oa));
247
248         EXIT;
249  out:
250         ptlrpc_free_req(request);
251         return rc;
252 }
253
254 static int osc_destroy(struct lustre_handle *conn, struct obdo *oa,
255                        struct lov_stripe_md *ea)
256 {
257         struct ptlrpc_request *request;
258         struct ost_body *body;
259         int rc, size = sizeof(*body);
260         ENTRY;
261
262         if (!oa) {
263                 CERROR("oa NULL\n");
264                 RETURN(-EINVAL);
265         }
266         request = ptlrpc_prep_req2(conn, OST_DESTROY, 1, &size, NULL);
267         if (!request)
268                 RETURN(-ENOMEM);
269
270         body = lustre_msg_buf(request->rq_reqmsg, 0);
271 #warning FIXME: pack only valid fields instead of memcpy, endianness
272         memcpy(&body->oa, oa, sizeof(*oa));
273
274         request->rq_replen = lustre_msg_size(1, &size);
275
276         rc = ptlrpc_queue_wait(request);
277         rc = ptlrpc_check_status(request, rc);
278         if (rc)
279                 GOTO(out, rc);
280
281         body = lustre_msg_buf(request->rq_repmsg, 0);
282         memcpy(oa, &body->oa, sizeof(*oa));
283
284         EXIT;
285  out:
286         ptlrpc_free_req(request);
287         return rc;
288 }
289
290 struct osc_brw_cb_data {
291         brw_callback_t callback;
292         void *cb_data;
293         void *obd_data;
294         size_t obd_size;
295 };
296
297 /* Our bulk-unmapping bottom half. */
298 static void unmap_and_decref_bulk_desc(void *data)
299 {
300         struct ptlrpc_bulk_desc *desc = data;
301         struct list_head *tmp;
302         ENTRY;
303
304         /* This feels wrong to me. */
305         list_for_each(tmp, &desc->b_page_list) {
306                 struct ptlrpc_bulk_page *bulk;
307                 bulk = list_entry(tmp, struct ptlrpc_bulk_page, b_link);
308
309                 kunmap(bulk->b_page);
310         }
311
312         ptlrpc_bulk_decref(desc);
313         EXIT;
314 }
315
316 static void brw_finish(struct ptlrpc_bulk_desc *desc, void *data)
317 {
318         struct osc_brw_cb_data *cb_data = data;
319         int err = 0;
320         ENTRY;
321
322         if (desc->b_flags & PTL_RPC_FL_TIMEOUT) {
323                 err = (desc->b_flags & PTL_RPC_FL_INTR ? -ERESTARTSYS : 
324                        -ETIMEDOUT);
325         }
326
327         if (cb_data->callback)
328                 cb_data->callback(cb_data->cb_data, err, CB_PHASE_FINISH);
329
330         OBD_FREE(cb_data->obd_data, cb_data->obd_size);
331         OBD_FREE(cb_data, sizeof(*cb_data));
332
333         /* We can't kunmap the desc from interrupt context, so we do it from
334          * the bottom half above. */
335         INIT_TQUEUE(&desc->b_queue, 0, 0);
336         PREPARE_TQUEUE(&desc->b_queue, unmap_and_decref_bulk_desc, desc);
337         schedule_task(&desc->b_queue);
338
339         EXIT;
340 }
341
342 static int osc_brw_read(struct lustre_handle *conn, struct lov_stripe_md *md,
343                         obd_count page_count, struct brw_page *pga,
344                         brw_callback_t callback, struct io_cb_data *data)
345 {
346         struct ptlrpc_connection *connection = client_conn2cli(conn)->cl_conn;
347         struct ptlrpc_request *request = NULL;
348         struct ptlrpc_bulk_desc *desc = NULL;
349         struct ost_body *body;
350         struct osc_brw_cb_data *cb_data = NULL;
351         int rc, size[3] = {sizeof(*body)};
352         void *iooptr, *nioptr;
353         int mapped = 0;
354         __u32 xid;
355         ENTRY;
356
357         size[1] = sizeof(struct obd_ioobj);
358         size[2] = page_count * sizeof(struct niobuf_remote);
359
360         request = ptlrpc_prep_req2(conn, OST_READ, 3, size, NULL);
361         if (!request)
362                 RETURN(-ENOMEM);
363
364         body = lustre_msg_buf(request->rq_reqmsg, 0);
365
366         desc = ptlrpc_prep_bulk(connection);
367         if (!desc)
368                 GOTO(out_req, rc = -ENOMEM);
369         desc->b_portal = OST_BULK_PORTAL;
370         desc->b_cb = brw_finish;
371         OBD_ALLOC(cb_data, sizeof(*cb_data));
372         if (!cb_data)
373                 GOTO(out_desc, rc = -ENOMEM);
374
375         cb_data->callback = callback;
376         cb_data->cb_data = data;
377         data->desc = desc;
378         desc->b_cb_data = cb_data;
379
380         iooptr = lustre_msg_buf(request->rq_reqmsg, 1);
381         nioptr = lustre_msg_buf(request->rq_reqmsg, 2);
382         ost_pack_ioo(&iooptr, md, page_count);
383         /* end almost identical to brw_write case */
384
385         spin_lock(&connection->c_lock);
386         xid = ++connection->c_xid_out;       /* single xid for all pages */
387         spin_unlock(&connection->c_lock);
388
389         for (mapped = 0; mapped < page_count; mapped++) {
390                 struct ptlrpc_bulk_page *bulk = ptlrpc_prep_bulk_page(desc);
391                 if (bulk == NULL)
392                         GOTO(out_unmap, rc = -ENOMEM);
393
394                 bulk->b_xid = xid;           /* single xid for all pages */
395
396                 bulk->b_buf = kmap(pga[mapped].pg);
397                 bulk->b_page = pga[mapped].pg;
398                 bulk->b_buflen = PAGE_SIZE;
399                 ost_pack_niobuf(&nioptr, pga[mapped].off, pga[mapped].count,
400                                 pga[mapped].flag, bulk->b_xid);
401         }
402
403         /*
404          * Register the bulk first, because the reply could arrive out of order,
405          * and we want to be ready for the bulk data.
406          *
407          * The reference is released when brw_finish is complete.
408          *
409          * On error, we never do the brw_finish, so we handle all decrefs.
410          */
411         if (OBD_FAIL_CHECK(OBD_FAIL_OSC_BRW_READ_BULK)) {
412                 CERROR("obd_fail_loc=%x, skipping register_bulk\n",
413                        OBD_FAIL_OSC_BRW_READ_BULK);
414         } else {
415                 rc = ptlrpc_register_bulk(desc);
416                 if (rc)
417                         GOTO(out_unmap, rc);
418         }
419
420         request->rq_replen = lustre_msg_size(1, size);
421         rc = ptlrpc_queue_wait(request);
422         rc = ptlrpc_check_status(request, rc);
423
424         /*
425          * XXX: If there is an error during the processing of the callback,
426          *      such as a timeout in a sleep that it performs, brw_finish
427          *      will never get called, and we'll leak the desc, fail to kunmap
428          *      things, cats will live with dogs.  One solution would be to
429          *      export brw_finish as osc_brw_finish, so that the timeout case and
430          *      its kin could call it for proper cleanup.  An alternative would
431          *      be for an error return from the callback to cause us to clean up,
432          *      but that doesn't help the truly async cases (like LOV), which
433          *      will immediately return from their PHASE_START callback, before
434          *      any such cleanup-requiring error condition can be detected.
435          */
436         if (rc)
437                 GOTO(out_req, rc);
438
439         /* Callbacks cause asynchronous handling. */
440         rc = callback(data, 0, CB_PHASE_START); 
441
442 out_req:
443         ptlrpc_req_finished(request);
444         RETURN(rc);
445
446         /* Clean up on error. */
447 out_unmap:
448         while (mapped-- > 0)
449                 kunmap(pga[mapped].pg);
450         OBD_FREE(cb_data, sizeof(*cb_data));
451 out_desc:
452         ptlrpc_bulk_decref(desc);
453         goto out_req;
454 }
455
456 static int osc_brw_write(struct lustre_handle *conn, struct lov_stripe_md *md,
457                          obd_count page_count, struct brw_page *pga,
458                          brw_callback_t callback, struct io_cb_data *data)
459 {
460         struct ptlrpc_connection *connection = client_conn2cli(conn)->cl_conn;
461         struct ptlrpc_request *request = NULL;
462         struct ptlrpc_bulk_desc *desc = NULL;
463         struct ost_body *body;
464         struct niobuf_local *local = NULL;
465         struct niobuf_remote *remote;
466         struct osc_brw_cb_data *cb_data = NULL;
467         int rc, j, size[3] = {sizeof(*body)};
468         void *iooptr, *nioptr;
469         int mapped = 0;
470         ENTRY;
471
472         size[1] = sizeof(struct obd_ioobj);
473         size[2] = page_count * sizeof(*remote);
474
475         request = ptlrpc_prep_req2(conn, OST_WRITE, 3, size, NULL);
476         if (!request)
477                 RETURN(-ENOMEM);
478
479         body = lustre_msg_buf(request->rq_reqmsg, 0);
480
481         desc = ptlrpc_prep_bulk(connection);
482         if (!desc)
483                 GOTO(out_req, rc = -ENOMEM);
484         desc->b_portal = OSC_BULK_PORTAL;
485         desc->b_cb = brw_finish;
486         OBD_ALLOC(cb_data, sizeof(*cb_data));
487         if (!cb_data)
488                 GOTO(out_desc, rc = -ENOMEM);
489
490         cb_data->callback = callback;
491         cb_data->cb_data = data;
492         data->desc = desc;
493         desc->b_cb_data = cb_data;
494
495         iooptr = lustre_msg_buf(request->rq_reqmsg, 1);
496         nioptr = lustre_msg_buf(request->rq_reqmsg, 2);
497         ost_pack_ioo(&iooptr, md, page_count);
498         /* end almost identical to brw_read case */
499
500         OBD_ALLOC(local, page_count * sizeof(*local));
501         if (!local)
502                 GOTO(out_cb, rc = -ENOMEM);
503
504         cb_data->obd_data = local;
505         cb_data->obd_size = page_count * sizeof(*local);
506
507         for (mapped = 0; mapped < page_count; mapped++) {
508                 local[mapped].addr = kmap(pga[mapped].pg);
509                 local[mapped].offset = pga[mapped].off;
510                 local[mapped].len = pga[mapped].count;
511                 ost_pack_niobuf(&nioptr, pga[mapped].off, pga[mapped].count,
512                                 pga[mapped].flag, 0);
513         }
514
515         size[1] = page_count * sizeof(*remote);
516         request->rq_replen = lustre_msg_size(2, size);
517         rc = ptlrpc_queue_wait(request);
518         rc = ptlrpc_check_status(request, rc);
519         if (rc)
520                 GOTO(out_unmap, rc);
521
522         nioptr = lustre_msg_buf(request->rq_repmsg, 1);
523         if (!nioptr)
524                 GOTO(out_unmap, rc = -EINVAL);
525
526         if (request->rq_repmsg->buflens[1] != size[1]) {
527                 CERROR("buffer length wrong (%d vs. %d)\n",
528                        request->rq_repmsg->buflens[1], size[1]);
529                 GOTO(out_unmap, rc = -EINVAL);
530         }
531
532         for (j = 0; j < page_count; j++) {
533                 struct ptlrpc_bulk_page *bulk;
534
535                 ost_unpack_niobuf(&nioptr, &remote);
536
537                 bulk = ptlrpc_prep_bulk_page(desc);
538                 if (!bulk)
539                         GOTO(out_unmap, rc = -ENOMEM);
540
541                 bulk->b_buf = (void *)(unsigned long)local[j].addr;
542                 bulk->b_buflen = local[j].len;
543                 bulk->b_xid = remote->xid;
544                 bulk->b_page = pga[j].pg;
545         }
546
547         if (desc->b_page_count != page_count)
548                 LBUG();
549
550         if (OBD_FAIL_CHECK(OBD_FAIL_OSC_BRW_WRITE_BULK))
551                 GOTO(out_unmap, rc = 0);
552
553         /* Our reference is released when brw_finish is complete. */
554         rc = ptlrpc_send_bulk(desc);
555
556         /* XXX: Mike, same question as in osc_brw_read. */
557         if (rc)
558                 GOTO(out_req, rc);
559
560         /* Callbacks cause asynchronous handling. */
561         rc = callback(data, 0, CB_PHASE_START);
562
563 out_req:
564         ptlrpc_req_finished(request);
565         RETURN(rc);
566
567         /* Clean up on error. */
568 out_unmap:
569         while (mapped-- > 0)
570                 kunmap(pga[mapped].pg);
571
572         OBD_FREE(local, page_count * sizeof(*local));
573 out_cb:
574         OBD_FREE(cb_data, sizeof(*cb_data));
575 out_desc:
576         ptlrpc_bulk_decref(desc);
577         goto out_req;
578 }
579
580 static int osc_brw(int cmd, struct lustre_handle *conn,
581                    struct lov_stripe_md *md, obd_count page_count,
582                    struct brw_page *pga, brw_callback_t callback,
583                    struct io_cb_data *data)
584 {
585         if (cmd & OBD_BRW_WRITE)
586                 return osc_brw_write(conn, md, page_count, pga, callback, data);
587         else
588                 return osc_brw_read(conn, md, page_count, pga, callback, data);
589 }
590
591 static int osc_enqueue(struct lustre_handle *connh, struct lov_stripe_md *md, 
592                        struct lustre_handle *parent_lock, 
593                        __u32 type, void *extentp, int extent_len, __u32 mode,
594                        int *flags, void *callback, void *data, int datalen,
595                        struct lustre_handle *lockh)
596 {
597         __u64 res_id[RES_NAME_SIZE] = { md->lmd_object_id };
598         struct obd_device *obddev = class_conn2obd(connh);
599         struct ldlm_extent *extent = extentp;
600         int rc;
601         __u32 mode2;
602
603         /* Filesystem locks are given a bit of special treatment: first we
604          * fixup the lock to start and end on page boundaries. */
605         extent->start &= PAGE_MASK;
606         extent->end = (extent->end + PAGE_SIZE - 1) & PAGE_MASK;
607
608         /* Next, search for already existing extent locks that will cover us */
609         //osc_con2dlmcl(conn, &cl, &connection, &rconn);
610         rc = ldlm_lock_match(obddev->obd_namespace, res_id, type, extent,
611                              sizeof(extent), mode, lockh);
612         if (rc == 1) {
613                 /* We already have a lock, and it's referenced */
614                 return 0;
615         }
616
617         /* Next, search for locks that we can upgrade (if we're trying to write)
618          * or are more than we need (if we're trying to read).  Because the VFS
619          * and page cache already protect us locally, lots of readers/writers
620          * can share a single PW lock. */
621         if (mode == LCK_PW)
622                 mode2 = LCK_PR;
623         else
624                 mode2 = LCK_PW;
625
626         rc = ldlm_lock_match(obddev->obd_namespace, res_id, type, extent,
627                              sizeof(extent), mode2, lockh);
628         if (rc == 1) {
629                 int flags;
630                 /* FIXME: This is not incredibly elegant, but it might
631                  * be more elegant than adding another parameter to
632                  * lock_match.  I want a second opinion. */
633                 ldlm_lock_addref(lockh, mode);
634                 ldlm_lock_decref(lockh, mode2);
635
636                 if (mode == LCK_PR)
637                         return 0;
638
639                 rc = ldlm_cli_convert(lockh, mode, &flags);
640                 if (rc)
641                         LBUG();
642
643                 return rc;
644         }
645
646         rc = ldlm_cli_enqueue(connh, NULL,obddev->obd_namespace,
647                               parent_lock, res_id, type, extent,
648                               sizeof(extent), mode, flags, ldlm_completion_ast,
649                               callback, data, datalen, lockh);
650         return rc;
651 }
652
653 static int osc_cancel(struct lustre_handle *oconn, struct lov_stripe_md *md,
654                       __u32 mode, struct lustre_handle *lockh)
655 {
656         ENTRY;
657
658         ldlm_lock_decref(lockh, mode);
659
660         RETURN(0);
661 }
662
663 static int osc_statfs(struct lustre_handle *conn, struct statfs *sfs)
664 {
665         struct ptlrpc_request *request;
666         struct obd_statfs *osfs;
667         int rc, size = sizeof(*osfs);
668         ENTRY;
669
670         request = ptlrpc_prep_req2(conn, OST_STATFS, 0, NULL, NULL);
671         if (!request)
672                 RETURN(-ENOMEM);
673
674         request->rq_replen = lustre_msg_size(1, &size);
675
676         rc = ptlrpc_queue_wait(request);
677         rc = ptlrpc_check_status(request, rc);
678         if (rc) {
679                 CERROR("%s failed: rc = %d\n", __FUNCTION__, rc);
680                 GOTO(out, rc);
681         }
682
683         osfs = lustre_msg_buf(request->rq_repmsg, 0);
684         obd_statfs_unpack(osfs, sfs);
685
686         EXIT;
687  out:
688         ptlrpc_free_req(request);
689         return rc;
690 }
691
692 static int osc_iocontrol(long cmd, struct lustre_handle *conn, int len,
693                          void *karg, void *uarg)
694 {
695         struct obd_device *obddev = class_conn2obd(conn);
696         struct obd_ioctl_data *data = karg;
697         int err = 0;
698         ENTRY;
699
700         if (_IOC_TYPE(cmd) != IOC_LDLM_TYPE || _IOC_NR(cmd) < 
701                         IOC_LDLM_MIN_NR || _IOC_NR(cmd) > IOC_LDLM_MAX_NR) {
702                 CDEBUG(D_IOCTL, "invalid ioctl (type %ld, nr %ld, size %ld)\n",
703                         _IOC_TYPE(cmd), _IOC_NR(cmd), _IOC_SIZE(cmd));
704                 RETURN(-EINVAL);
705         }
706
707         switch (cmd) {
708         case IOC_LDLM_TEST: {
709                 err = ldlm_test(obddev, conn);
710                 CERROR("-- done err %d\n", err);
711                 GOTO(out, err);
712         }
713         case IOC_LDLM_REGRESS_START: {
714                 unsigned int numthreads; 
715                 
716                 if (data->ioc_inllen1) 
717                         numthreads = simple_strtoul(data->ioc_inlbuf1, NULL, 0);
718                 else 
719                         numthreads = 1;
720
721                 err = ldlm_regression_start(obddev, conn, numthreads);
722                 CERROR("-- done err %d\n", err);
723                 GOTO(out, err);
724         }
725         case IOC_LDLM_REGRESS_STOP: {
726                 err = ldlm_regression_stop();
727                 CERROR("-- done err %d\n", err);
728                 GOTO(out, err);
729         }
730         default:
731                 GOTO(out, err = -EINVAL);
732         }
733 out:
734         return err;
735 }
736
737 struct obd_ops osc_obd_ops = {
738         o_setup:        client_obd_setup,
739         o_cleanup:      client_obd_cleanup,
740         o_statfs:       osc_statfs,
741         o_create:       osc_create,
742         o_destroy:      osc_destroy,
743         o_getattr:      osc_getattr,
744         o_setattr:      osc_setattr,
745         o_open:         osc_open,
746         o_close:        osc_close,
747         o_connect:      client_obd_connect,
748         o_disconnect:   client_obd_disconnect,
749         o_brw:          osc_brw,
750         o_punch:        osc_punch,
751         o_enqueue:      osc_enqueue,
752         o_cancel:       osc_cancel,
753         o_iocontrol:    osc_iocontrol
754 };
755
756 static int __init osc_init(void)
757 {
758         return class_register_type(&osc_obd_ops, LUSTRE_OSC_NAME);
759 }
760
761 static void __exit osc_exit(void)
762 {
763         class_unregister_type(LUSTRE_OSC_NAME);
764 }
765
766 MODULE_AUTHOR("Cluster File Systems, Inc. <info@clusterfs.com>");
767 MODULE_DESCRIPTION("Lustre Object Storage Client (OSC) v1.0");
768 MODULE_LICENSE("GPL");
769
770 module_init(osc_init);
771 module_exit(osc_exit);