Whamcloud - gitweb
Fix strange non-complaining error for missing page_array and pagearray
[fs/lustre-release.git] / lustre / osc / osc_request.c
1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2  * vim:expandtab:shiftwidth=8:tabstop=8:
3  *
4  * Copyright (C) 2001, 2002 Cluster File Systems, Inc.
5  *
6  *  This code is issued under the GNU General Public License.
7  *  See the file COPYING in this distribution
8  *
9  *  Author Peter Braam <braam@clusterfs.com>
10  *
11  *  This server is single threaded at present (but can easily be multi
12  *  threaded). For testing and management it is treated as an
13  *  obd_device, although it does not export a full OBD method table
14  *  (the requests are coming in over the wire, so object target
15  *  modules do not have a full method table.)
16  *
17  */
18
19 #define EXPORT_SYMTAB
20 #define DEBUG_SUBSYSTEM S_OSC
21
22 #include <linux/module.h>
23 #include <linux/lustre_dlm.h>
24 #include <linux/lustre_mds.h> /* for mds_objid */
25 #include <linux/obd_ost.h>
26 #include <linux/obd_lov.h>
27 #include <linux/init.h>
28 #include <linux/lustre_ha.h>
29
30 static int osc_getattr(struct lustre_handle *conn, struct obdo *oa, 
31                        struct lov_stripe_md *md)
32 {
33         struct ptlrpc_request *request;
34         struct ost_body *body;
35         int rc, size = sizeof(*body);
36         ENTRY;
37
38         request = ptlrpc_prep_req2(conn, OST_GETATTR, 1, &size, NULL);
39         if (!request)
40                 RETURN(-ENOMEM);
41
42         body = lustre_msg_buf(request->rq_reqmsg, 0);
43 #warning FIXME: pack only valid fields instead of memcpy, endianness
44         memcpy(&body->oa, oa, sizeof(*oa));
45
46         request->rq_replen = lustre_msg_size(1, &size);
47
48         rc = ptlrpc_queue_wait(request);
49         rc = ptlrpc_check_status(request, rc);
50         if (rc) {
51                 CERROR("%s failed: rc = %d\n", __FUNCTION__, rc);
52                 GOTO(out, rc);
53         }
54
55         body = lustre_msg_buf(request->rq_repmsg, 0);
56         CDEBUG(D_INODE, "mode: %o\n", body->oa.o_mode);
57         if (oa)
58                 memcpy(oa, &body->oa, sizeof(*oa));
59
60         EXIT;
61  out:
62         ptlrpc_free_req(request);
63         return rc;
64 }
65
66 static int osc_open(struct lustre_handle *conn, struct obdo *oa,
67                     struct lov_stripe_md *md)
68 {
69         struct ptlrpc_request *request;
70         struct ost_body *body;
71         int rc, size = sizeof(*body);
72         ENTRY;
73
74         request = ptlrpc_prep_req2(conn, OST_OPEN, 1, &size, NULL);
75         if (!request)
76                 RETURN(-ENOMEM);
77
78         body = lustre_msg_buf(request->rq_reqmsg, 0);
79 #warning FIXME: pack only valid fields instead of memcpy, endianness
80         memcpy(&body->oa, oa, sizeof(*oa));
81
82         request->rq_replen = lustre_msg_size(1, &size);
83
84         rc = ptlrpc_queue_wait(request);
85         rc = ptlrpc_check_status(request, rc);
86         if (rc)
87                 GOTO(out, rc);
88
89         body = lustre_msg_buf(request->rq_repmsg, 0);
90         CDEBUG(D_INODE, "mode: %o\n", body->oa.o_mode);
91         if (oa)
92                 memcpy(oa, &body->oa, sizeof(*oa));
93
94         EXIT;
95  out:
96         ptlrpc_free_req(request);
97         return rc;
98 }
99
100 static int osc_close(struct lustre_handle *conn, struct obdo *oa,
101                      struct lov_stripe_md *md)
102 {
103         struct ptlrpc_request *request;
104         struct ost_body *body;
105         int rc, size = sizeof(*body);
106         ENTRY;
107
108         request = ptlrpc_prep_req2(conn, OST_CLOSE, 1, &size, NULL);
109         if (!request)
110                 RETURN(-ENOMEM);
111
112         body = lustre_msg_buf(request->rq_reqmsg, 0);
113 #warning FIXME: pack only valid fields instead of memcpy, endianness
114         memcpy(&body->oa, oa, sizeof(*oa));
115
116         request->rq_replen = lustre_msg_size(1, &size);
117
118         rc = ptlrpc_queue_wait(request);
119         rc = ptlrpc_check_status(request, rc);
120         if (rc)
121                 GOTO(out, rc);
122
123         body = lustre_msg_buf(request->rq_repmsg, 0);
124         CDEBUG(D_INODE, "mode: %o\n", body->oa.o_mode);
125         if (oa)
126                 memcpy(oa, &body->oa, sizeof(*oa));
127
128         EXIT;
129  out:
130         ptlrpc_free_req(request);
131         return rc;
132 }
133
134 static int osc_setattr(struct lustre_handle *conn, struct obdo *oa,
135                        struct lov_stripe_md *md)
136 {
137         struct ptlrpc_request *request;
138         struct ost_body *body;
139         int rc, size = sizeof(*body);
140         ENTRY;
141
142         request = ptlrpc_prep_req2(conn, OST_SETATTR, 1, &size, NULL);
143         if (!request)
144                 RETURN(-ENOMEM);
145
146         body = lustre_msg_buf(request->rq_reqmsg, 0);
147         memcpy(&body->oa, oa, sizeof(*oa));
148
149         request->rq_replen = lustre_msg_size(1, &size);
150
151         rc = ptlrpc_queue_wait(request);
152         rc = ptlrpc_check_status(request, rc);
153         GOTO(out, rc);
154
155  out:
156         ptlrpc_free_req(request);
157         return rc;
158 }
159
160 static int osc_create(struct lustre_handle *conn, struct obdo *oa,
161                       struct lov_stripe_md **ea)
162 {
163         struct ptlrpc_request *request;
164         struct ost_body *body;
165         int rc, size = sizeof(*body);
166         ENTRY;
167
168         if (!oa) {
169                 CERROR("oa NULL\n");
170                 RETURN(-EINVAL);
171         }
172
173         if (!ea) {
174                 LBUG();
175         }
176
177         if (!*ea) {
178                 OBD_ALLOC(*ea, oa->o_easize);
179                 if (!*ea)
180                         RETURN(-ENOMEM);
181                 (*ea)->lmd_easize = oa->o_easize;
182         }
183
184         request = ptlrpc_prep_req2(conn, OST_CREATE, 1, &size, NULL);
185         if (!request)
186                 RETURN(-ENOMEM);
187
188         body = lustre_msg_buf(request->rq_reqmsg, 0);
189         memcpy(&body->oa, oa, sizeof(*oa));
190
191         request->rq_replen = lustre_msg_size(1, &size);
192
193         rc = ptlrpc_queue_wait(request);
194         rc = ptlrpc_check_status(request, rc);
195         if (rc)
196                 GOTO(out, rc);
197
198         body = lustre_msg_buf(request->rq_repmsg, 0);
199         memcpy(oa, &body->oa, sizeof(*oa));
200
201         (*ea)->lmd_object_id = oa->o_id;
202         (*ea)->lmd_stripe_count = 1;
203         EXIT;
204  out:
205         ptlrpc_free_req(request);
206         return rc;
207 }
208
209 static int osc_punch(struct lustre_handle *conn, struct obdo *oa,
210                      struct lov_stripe_md *md, obd_size start,
211                      obd_size end)
212 {
213         struct ptlrpc_request *request;
214         struct ost_body *body;
215         int rc, size = sizeof(*body);
216         ENTRY;
217
218         if (!oa) {
219                 CERROR("oa NULL\n");
220                 RETURN(-EINVAL);
221         }
222
223         request = ptlrpc_prep_req2(conn, OST_PUNCH, 1, &size, NULL);
224         if (!request)
225                 RETURN(-ENOMEM);
226
227         body = lustre_msg_buf(request->rq_reqmsg, 0);
228 #warning FIXME: pack only valid fields instead of memcpy, endianness, valid
229         memcpy(&body->oa, oa, sizeof(*oa));
230
231         /* overload the blocks and size fields in the oa with start/end */ 
232 #warning FIXME: endianness, size=start, blocks=end?
233         body->oa.o_blocks = start;
234         body->oa.o_size = end;
235         body->oa.o_valid |= OBD_MD_FLBLOCKS | OBD_MD_FLSIZE;
236
237         request->rq_replen = lustre_msg_size(1, &size);
238
239         rc = ptlrpc_queue_wait(request);
240         rc = ptlrpc_check_status(request, rc);
241         if (rc)
242                 GOTO(out, rc);
243
244         body = lustre_msg_buf(request->rq_repmsg, 0);
245         memcpy(oa, &body->oa, sizeof(*oa));
246
247         EXIT;
248  out:
249         ptlrpc_free_req(request);
250         return rc;
251 }
252
253 static int osc_destroy(struct lustre_handle *conn, struct obdo *oa,
254                        struct lov_stripe_md *ea)
255 {
256         struct ptlrpc_request *request;
257         struct ost_body *body;
258         int rc, size = sizeof(*body);
259         ENTRY;
260
261         if (!oa) {
262                 CERROR("oa NULL\n");
263                 RETURN(-EINVAL);
264         }
265         request = ptlrpc_prep_req2(conn, OST_DESTROY, 1, &size, NULL);
266         if (!request)
267                 RETURN(-ENOMEM);
268
269         body = lustre_msg_buf(request->rq_reqmsg, 0);
270 #warning FIXME: pack only valid fields instead of memcpy, endianness
271         memcpy(&body->oa, oa, sizeof(*oa));
272
273         request->rq_replen = lustre_msg_size(1, &size);
274
275         rc = ptlrpc_queue_wait(request);
276         rc = ptlrpc_check_status(request, rc);
277         if (rc)
278                 GOTO(out, rc);
279
280         body = lustre_msg_buf(request->rq_repmsg, 0);
281         memcpy(oa, &body->oa, sizeof(*oa));
282
283         EXIT;
284  out:
285         ptlrpc_free_req(request);
286         return rc;
287 }
288
289 struct osc_brw_cb_data {
290         brw_callback_t callback;
291         void *cb_data;
292         void *obd_data;
293         size_t obd_size;
294 };
295
296 /* Our bulk-unmapping bottom half. */
297 static void unmap_and_decref_bulk_desc(void *data)
298 {
299         struct ptlrpc_bulk_desc *desc = data;
300         struct list_head *tmp;
301         ENTRY;
302
303         /* This feels wrong to me. */
304         list_for_each(tmp, &desc->b_page_list) {
305                 struct ptlrpc_bulk_page *bulk;
306                 bulk = list_entry(tmp, struct ptlrpc_bulk_page, b_link);
307
308                 kunmap(bulk->b_page);
309         }
310
311         ptlrpc_bulk_decref(desc);
312         EXIT;
313 }
314
315 static void brw_finish(struct ptlrpc_bulk_desc *desc, void *data)
316 {
317         struct osc_brw_cb_data *cb_data = data;
318         int err = 0;
319         ENTRY;
320
321         if (desc->b_flags & PTL_RPC_FL_TIMEOUT) {
322                 err = (desc->b_flags & PTL_RPC_FL_INTR ? -ERESTARTSYS : 
323                        -ETIMEDOUT);
324         }
325
326         if (cb_data->callback)
327                 cb_data->callback(cb_data->cb_data, err, CB_PHASE_FINISH);
328
329         OBD_FREE(cb_data->obd_data, cb_data->obd_size);
330         OBD_FREE(cb_data, sizeof(*cb_data));
331
332         /* We can't kunmap the desc from interrupt context, so we do it from
333          * the bottom half above. */
334         INIT_TQUEUE(&desc->b_queue, 0, 0);
335         PREPARE_TQUEUE(&desc->b_queue, unmap_and_decref_bulk_desc, desc);
336         schedule_task(&desc->b_queue);
337
338         EXIT;
339 }
340
341 static int osc_brw_read(struct lustre_handle *conn, struct lov_stripe_md *md,
342                         obd_count page_count, struct brw_page *pga,
343                         brw_callback_t callback, struct io_cb_data *data)
344 {
345         struct ptlrpc_connection *connection = client_conn2cli(conn)->cl_conn;
346         struct ptlrpc_request *request = NULL;
347         struct ptlrpc_bulk_desc *desc = NULL;
348         struct ost_body *body;
349         struct osc_brw_cb_data *cb_data = NULL;
350         int rc, size[3] = {sizeof(*body)};
351         void *iooptr, *nioptr;
352         int mapped = 0;
353         __u32 xid;
354         ENTRY;
355
356         size[1] = sizeof(struct obd_ioobj);
357         size[2] = page_count * sizeof(struct niobuf_remote);
358
359         request = ptlrpc_prep_req2(conn, OST_READ, 3, size, NULL);
360         if (!request)
361                 RETURN(-ENOMEM);
362
363         body = lustre_msg_buf(request->rq_reqmsg, 0);
364
365         desc = ptlrpc_prep_bulk(connection);
366         if (!desc)
367                 GOTO(out_req, rc = -ENOMEM);
368         desc->b_portal = OST_BULK_PORTAL;
369         desc->b_cb = brw_finish;
370         OBD_ALLOC(cb_data, sizeof(*cb_data));
371         if (!cb_data)
372                 GOTO(out_desc, rc = -ENOMEM);
373
374         cb_data->callback = callback;
375         cb_data->cb_data = data;
376         data->desc = desc;
377         desc->b_cb_data = cb_data;
378
379         iooptr = lustre_msg_buf(request->rq_reqmsg, 1);
380         nioptr = lustre_msg_buf(request->rq_reqmsg, 2);
381         ost_pack_ioo(&iooptr, md, page_count);
382         /* end almost identical to brw_write case */
383
384         spin_lock(&connection->c_lock);
385         xid = ++connection->c_xid_out;       /* single xid for all pages */
386         spin_unlock(&connection->c_lock);
387
388         for (mapped = 0; mapped < page_count; mapped++) {
389                 struct ptlrpc_bulk_page *bulk = ptlrpc_prep_bulk_page(desc);
390                 if (bulk == NULL)
391                         GOTO(out_unmap, rc = -ENOMEM);
392
393                 bulk->b_xid = xid;           /* single xid for all pages */
394
395                 bulk->b_buf = kmap(pga[mapped].pg);
396                 bulk->b_page = pga[mapped].pg;
397                 bulk->b_buflen = PAGE_SIZE;
398                 ost_pack_niobuf(&nioptr, pga[mapped].off, pga[mapped].count,
399                                 pga[mapped].flag, bulk->b_xid);
400         }
401
402         /*
403          * Register the bulk first, because the reply could arrive out of order,
404          * and we want to be ready for the bulk data.
405          *
406          * The reference is released when brw_finish is complete.
407          *
408          * On error, we never do the brw_finish, so we handle all decrefs.
409          */
410         rc = ptlrpc_register_bulk(desc);
411         if (rc)
412                 GOTO(out_unmap, rc);
413
414         request->rq_replen = lustre_msg_size(1, size);
415         rc = ptlrpc_queue_wait(request);
416         rc = ptlrpc_check_status(request, rc);
417
418         /* XXX: Mike, this is the only place I'm not sure of.  If we have
419          *      an error here, will we have always called brw_finish?  If no,
420          *      then out_req will not clean up and we should go to out_desc.
421          *      If maybe, then we are screwed, and we need to set things up
422          *      so that bulk_sink_callback is called for each bulk page,
423          *      even on error so brw_finish is always called.  It would need
424          *      to be passed an error code as a parameter to know what to do.
425          *
426          *      That would also help with the partial completion case, so
427          *      we could say in brw_finish "these pages are done, don't
428          *      restart them" and osc_brw callers can know this.
429          */
430         if (rc)
431                 GOTO(out_req, rc);
432
433         /* Callbacks cause asynchronous handling. */
434         rc = callback(data, 0, CB_PHASE_START); 
435
436 out_req:
437         ptlrpc_req_finished(request);
438         RETURN(rc);
439
440         /* Clean up on error. */
441 out_unmap:
442         while (mapped-- > 0)
443                 kunmap(pga[mapped]);
444         OBD_FREE(cb_data, sizeof(*cb_data));
445 out_desc:
446         ptlrpc_bulk_decref(desc);
447         goto out_req;
448 }
449
450 static int osc_brw_write(struct lustre_handle *conn, struct lov_stripe_md *md,
451                          obd_count page_count, struct brw_page *pga,
452                          brw_callback_t callback, struct io_cb_data *data)
453 {
454         struct ptlrpc_connection *connection = client_conn2cli(conn)->cl_conn;
455         struct ptlrpc_request *request = NULL;
456         struct ptlrpc_bulk_desc *desc = NULL;
457         struct ost_body *body;
458         struct niobuf_local *local = NULL;
459         struct niobuf_remote *remote;
460         struct osc_brw_cb_data *cb_data = NULL;
461         int rc, j, size[3] = {sizeof(*body)};
462         void *iooptr, *nioptr;
463         int mapped = 0;
464         ENTRY;
465
466         size[1] = sizeof(struct obd_ioobj);
467         size[2] = page_count * sizeof(*remote);
468
469         request = ptlrpc_prep_req2(conn, OST_WRITE, 3, size, NULL);
470         if (!request)
471                 RETURN(-ENOMEM);
472
473         body = lustre_msg_buf(request->rq_reqmsg, 0);
474
475         desc = ptlrpc_prep_bulk(connection);
476         if (!desc)
477                 GOTO(out_req, rc = -ENOMEM);
478         desc->b_portal = OSC_BULK_PORTAL;
479         desc->b_cb = brw_finish;
480         OBD_ALLOC(cb_data, sizeof(*cb_data));
481         if (!cb_data)
482                 GOTO(out_desc, rc = -ENOMEM);
483
484         cb_data->callback = callback;
485         cb_data->cb_data = data;
486         data->desc = desc;
487         desc->b_cb_data = cb_data;
488
489         iooptr = lustre_msg_buf(request->rq_reqmsg, 1);
490         nioptr = lustre_msg_buf(request->rq_reqmsg, 2);
491         ost_pack_ioo(&iooptr, md, page_count);
492         /* end almost identical to brw_read case */
493
494         OBD_ALLOC(local, page_count * sizeof(*local));
495         if (!local)
496                 GOTO(out_cb, rc = -ENOMEM);
497
498         cb_data->obd_data = local;
499         cb_data->obd_size = page_count * sizeof(*local);
500
501         for (mapped = 0; mapped < page_count; mapped++) {
502                 local[mapped].addr = kmap(pga[mapped].pg);
503                 local[mapped].offset = pga[mapped].off;
504                 local[mapped].len = pga[mapped].count;
505                 ost_pack_niobuf(&nioptr, pga[mapped].off, pga[mapped].count,
506                                 pga[mapped].flag, 0);
507         }
508
509         size[1] = page_count * sizeof(*remote);
510         request->rq_replen = lustre_msg_size(2, size);
511         rc = ptlrpc_queue_wait(request);
512         rc = ptlrpc_check_status(request, rc);
513         if (rc)
514                 GOTO(out_unmap, rc);
515
516         nioptr = lustre_msg_buf(request->rq_repmsg, 1);
517         if (!nioptr)
518                 GOTO(out_unmap, rc = -EINVAL);
519
520         if (request->rq_repmsg->buflens[1] != size[1]) {
521                 CERROR("buffer length wrong (%d vs. %d)\n",
522                        request->rq_repmsg->buflens[1], size[1]);
523                 GOTO(out_unmap, rc = -EINVAL);
524         }
525
526         for (j = 0; j < page_count; j++) {
527                 struct ptlrpc_bulk_page *bulk;
528
529                 ost_unpack_niobuf(&nioptr, &remote);
530
531                 bulk = ptlrpc_prep_bulk_page(desc);
532                 if (!bulk)
533                         GOTO(out_unmap, rc = -ENOMEM);
534
535                 bulk->b_buf = (void *)(unsigned long)local[j].addr;
536                 bulk->b_buflen = local[j].len;
537                 bulk->b_xid = remote->xid;
538                 bulk->b_page = pga[j].pg;
539         }
540
541         if (desc->b_page_count != page_count)
542                 LBUG();
543
544         /* Our reference is released when brw_finish is complete. */
545         rc = ptlrpc_send_bulk(desc);
546
547         /* XXX: Mike, same question as in osc_brw_read. */
548         if (rc)
549                 GOTO(out_req, rc);
550
551         /* Callbacks cause asynchronous handling. */
552         rc = callback(data, 0, CB_PHASE_START);
553
554 out_req:
555         ptlrpc_req_finished(request);
556         RETURN(rc);
557
558         /* Clean up on error. */
559 out_unmap:
560         while (mapped-- > 0)
561                 kunmap(pga[mapped]);
562
563         OBD_FREE(local, page_count * sizeof(*local));
564 out_cb:
565         OBD_FREE(cb_data, sizeof(*cb_data));
566 out_desc:
567         ptlrpc_bulk_decref(desc);
568         goto out_req;
569 }
570
571 static int osc_brw(int cmd, struct lustre_handle *conn,
572                    struct lov_stripe_md *md, obd_count page_count,
573                    struct brw_page *pga, brw_callback_t callback,
574                    struct io_cb_data *data)
575 {
576         if (cmd & OBD_BRW_WRITE)
577                 return osc_brw_write(conn, md, page_count, pga, callback, data);
578         else
579                 return osc_brw_read(conn, md, page_count, pga, callback, data);
580 }
581
582 static int osc_enqueue(struct lustre_handle *connh, struct lov_stripe_md *md, 
583                        struct lustre_handle *parent_lock, 
584                        __u32 type, void *extentp, int extent_len, __u32 mode,
585                        int *flags, void *callback, void *data, int datalen,
586                        struct lustre_handle *lockh)
587 {
588         __u64 res_id[RES_NAME_SIZE] = { md->lmd_object_id };
589         struct obd_device *obddev = class_conn2obd(connh);
590         struct ldlm_extent *extent = extentp;
591         int rc;
592         __u32 mode2;
593
594         /* Filesystem locks are given a bit of special treatment: first we
595          * fixup the lock to start and end on page boundaries. */
596         extent->start &= PAGE_MASK;
597         extent->end = (extent->end + PAGE_SIZE - 1) & PAGE_MASK;
598
599         /* Next, search for already existing extent locks that will cover us */
600         //osc_con2dlmcl(conn, &cl, &connection, &rconn);
601         rc = ldlm_lock_match(obddev->obd_namespace, res_id, type, extent,
602                              sizeof(extent), mode, lockh);
603         if (rc == 1) {
604                 /* We already have a lock, and it's referenced */
605                 return 0;
606         }
607
608         /* Next, search for locks that we can upgrade (if we're trying to write)
609          * or are more than we need (if we're trying to read).  Because the VFS
610          * and page cache already protect us locally, lots of readers/writers
611          * can share a single PW lock. */
612         if (mode == LCK_PW)
613                 mode2 = LCK_PR;
614         else
615                 mode2 = LCK_PW;
616
617         rc = ldlm_lock_match(obddev->obd_namespace, res_id, type, extent,
618                              sizeof(extent), mode2, lockh);
619         if (rc == 1) {
620                 int flags;
621                 /* FIXME: This is not incredibly elegant, but it might
622                  * be more elegant than adding another parameter to
623                  * lock_match.  I want a second opinion. */
624                 ldlm_lock_addref(lockh, mode);
625                 ldlm_lock_decref(lockh, mode2);
626
627                 if (mode == LCK_PR)
628                         return 0;
629
630                 rc = ldlm_cli_convert(lockh, mode, &flags);
631                 if (rc)
632                         LBUG();
633
634                 return rc;
635         }
636
637         rc = ldlm_cli_enqueue(connh, NULL,obddev->obd_namespace,
638                               parent_lock, res_id, type, extent,
639                               sizeof(extent), mode, flags, ldlm_completion_ast,
640                               callback, data, datalen, lockh);
641         return rc;
642 }
643
644 static int osc_cancel(struct lustre_handle *oconn, struct lov_stripe_md *md,
645                       __u32 mode, struct lustre_handle *lockh)
646 {
647         ENTRY;
648
649         ldlm_lock_decref(lockh, mode);
650
651         RETURN(0);
652 }
653
654 static int osc_statfs(struct lustre_handle *conn, struct statfs *sfs)
655 {
656         struct ptlrpc_request *request;
657         struct obd_statfs *osfs;
658         int rc, size = sizeof(*osfs);
659         ENTRY;
660
661         request = ptlrpc_prep_req2(conn, OST_STATFS, 0, NULL, NULL);
662         if (!request)
663                 RETURN(-ENOMEM);
664
665         request->rq_replen = lustre_msg_size(1, &size);
666
667         rc = ptlrpc_queue_wait(request);
668         rc = ptlrpc_check_status(request, rc);
669         if (rc) {
670                 CERROR("%s failed: rc = %d\n", __FUNCTION__, rc);
671                 GOTO(out, rc);
672         }
673
674         osfs = lustre_msg_buf(request->rq_repmsg, 0);
675         obd_statfs_unpack(osfs, sfs);
676
677         EXIT;
678  out:
679         ptlrpc_free_req(request);
680         return rc;
681 }
682
683 static int osc_iocontrol(long cmd, struct lustre_handle *conn, int len,
684                          void *karg, void *uarg)
685 {
686         struct obd_device *obddev = class_conn2obd(conn);
687         struct obd_ioctl_data *data = karg;
688         int err = 0;
689         ENTRY;
690
691         if (_IOC_TYPE(cmd) != IOC_LDLM_TYPE || _IOC_NR(cmd) < 
692                         IOC_LDLM_MIN_NR || _IOC_NR(cmd) > IOC_LDLM_MAX_NR) {
693                 CDEBUG(D_IOCTL, "invalid ioctl (type %ld, nr %ld, size %ld)\n",
694                         _IOC_TYPE(cmd), _IOC_NR(cmd), _IOC_SIZE(cmd));
695                 RETURN(-EINVAL);
696         }
697
698         switch (cmd) {
699         case IOC_LDLM_TEST: {
700                 err = ldlm_test(obddev, conn);
701                 CERROR("-- done err %d\n", err);
702                 GOTO(out, err);
703         }
704         case IOC_LDLM_REGRESS_START: {
705                 unsigned int numthreads; 
706                 
707                 if (data->ioc_inllen1) 
708                         numthreads = simple_strtoul(data->ioc_inlbuf1, NULL, 0);
709                 else 
710                         numthreads = 1;
711
712                 err = ldlm_regression_start(obddev, conn, numthreads);
713                 CERROR("-- done err %d\n", err);
714                 GOTO(out, err);
715         }
716         case IOC_LDLM_REGRESS_STOP: {
717                 err = ldlm_regression_stop();
718                 CERROR("-- done err %d\n", err);
719                 GOTO(out, err);
720         }
721         default:
722                 GOTO(out, err = -EINVAL);
723         }
724 out:
725         return err;
726 }
727
728 struct obd_ops osc_obd_ops = {
729         o_setup:        client_obd_setup,
730         o_cleanup:      client_obd_cleanup,
731         o_statfs:       osc_statfs,
732         o_create:       osc_create,
733         o_destroy:      osc_destroy,
734         o_getattr:      osc_getattr,
735         o_setattr:      osc_setattr,
736         o_open:         osc_open,
737         o_close:        osc_close,
738         o_connect:      client_obd_connect,
739         o_disconnect:   client_obd_disconnect,
740         o_brw:          osc_brw,
741         o_punch:        osc_punch,
742         o_enqueue:      osc_enqueue,
743         o_cancel:       osc_cancel,
744         o_iocontrol:    osc_iocontrol
745 };
746
747 static int __init osc_init(void)
748 {
749         return class_register_type(&osc_obd_ops, LUSTRE_OSC_NAME);
750 }
751
752 static void __exit osc_exit(void)
753 {
754         class_unregister_type(LUSTRE_OSC_NAME);
755 }
756
757 MODULE_AUTHOR("Cluster File Systems, Inc. <info@clusterfs.com>");
758 MODULE_DESCRIPTION("Lustre Object Storage Client (OSC) v1.0");
759 MODULE_LICENSE("GPL");
760
761 module_init(osc_init);
762 module_exit(osc_exit);