Whamcloud - gitweb
- removed lock list from inode info
[fs/lustre-release.git] / lustre / osc / osc_request.c
1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2  * vim:expandtab:shiftwidth=8:tabstop=8:
3  *
4  * Copyright (C) 2001, 2002 Cluster File Systems, Inc.
5  *
6  *  This code is issued under the GNU General Public License.
7  *  See the file COPYING in this distribution
8  *
9  *  Author Peter Braam <braam@clusterfs.com>
10  *
11  *  This server is single threaded at present (but can easily be multi
12  *  threaded). For testing and management it is treated as an
13  *  obd_device, although it does not export a full OBD method table
14  *  (the requests are coming in over the wire, so object target
15  *  modules do not have a full method table.)
16  *
17  */
18
19 #define EXPORT_SYMTAB
20 #define DEBUG_SUBSYSTEM S_OSC
21
22 #include <linux/module.h>
23 #include <linux/lustre_dlm.h>
24 #include <linux/lustre_mds.h> /* for mds_objid */
25 #include <linux/obd_ost.h>
26 #include <linux/obd_lov.h>
27 #include <linux/init.h>
28 #include <linux/lustre_ha.h>
29 #include <linux/obd_support.h> /* for OBD_FAIL_CHECK */
30 #include <linux/lustre_lite.h> /* for ll_i2info */
31
32 static int osc_getattr(struct lustre_handle *conn, struct obdo *oa, 
33                        struct lov_stripe_md *md)
34 {
35         struct ptlrpc_request *request;
36         struct ost_body *body;
37         int rc, size = sizeof(*body);
38         ENTRY;
39
40         request = ptlrpc_prep_req2(conn, OST_GETATTR, 1, &size, NULL);
41         if (!request)
42                 RETURN(-ENOMEM);
43
44         body = lustre_msg_buf(request->rq_reqmsg, 0);
45 #warning FIXME: pack only valid fields instead of memcpy, endianness
46         memcpy(&body->oa, oa, sizeof(*oa));
47
48         request->rq_replen = lustre_msg_size(1, &size);
49
50         rc = ptlrpc_queue_wait(request);
51         rc = ptlrpc_check_status(request, rc);
52         if (rc) {
53                 CERROR("%s failed: rc = %d\n", __FUNCTION__, rc);
54                 GOTO(out, rc);
55         }
56
57         body = lustre_msg_buf(request->rq_repmsg, 0);
58         CDEBUG(D_INODE, "mode: %o\n", body->oa.o_mode);
59         if (oa)
60                 memcpy(oa, &body->oa, sizeof(*oa));
61
62         EXIT;
63  out:
64         ptlrpc_free_req(request);
65         return rc;
66 }
67
68 static int osc_open(struct lustre_handle *conn, struct obdo *oa,
69                     struct lov_stripe_md *md)
70 {
71         struct ptlrpc_request *request;
72         struct ost_body *body;
73         int rc, size = sizeof(*body);
74         ENTRY;
75
76         request = ptlrpc_prep_req2(conn, OST_OPEN, 1, &size, NULL);
77         if (!request)
78                 RETURN(-ENOMEM);
79
80         body = lustre_msg_buf(request->rq_reqmsg, 0);
81 #warning FIXME: pack only valid fields instead of memcpy, endianness
82         memcpy(&body->oa, oa, sizeof(*oa));
83
84         request->rq_replen = lustre_msg_size(1, &size);
85
86         rc = ptlrpc_queue_wait(request);
87         rc = ptlrpc_check_status(request, rc);
88         if (rc)
89                 GOTO(out, rc);
90
91         body = lustre_msg_buf(request->rq_repmsg, 0);
92         CDEBUG(D_INODE, "mode: %o\n", body->oa.o_mode);
93         if (oa)
94                 memcpy(oa, &body->oa, sizeof(*oa));
95
96         EXIT;
97  out:
98         ptlrpc_free_req(request);
99         return rc;
100 }
101
102 static int osc_close(struct lustre_handle *conn, struct obdo *oa,
103                      struct lov_stripe_md *md)
104 {
105         struct ptlrpc_request *request;
106         struct ost_body *body;
107         int rc, size = sizeof(*body);
108         ENTRY;
109
110         request = ptlrpc_prep_req2(conn, OST_CLOSE, 1, &size, NULL);
111         if (!request)
112                 RETURN(-ENOMEM);
113
114         body = lustre_msg_buf(request->rq_reqmsg, 0);
115 #warning FIXME: pack only valid fields instead of memcpy, endianness
116         memcpy(&body->oa, oa, sizeof(*oa));
117
118         request->rq_replen = lustre_msg_size(1, &size);
119
120         rc = ptlrpc_queue_wait(request);
121         rc = ptlrpc_check_status(request, rc);
122         if (rc)
123                 GOTO(out, rc);
124
125         body = lustre_msg_buf(request->rq_repmsg, 0);
126         CDEBUG(D_INODE, "mode: %o\n", body->oa.o_mode);
127         if (oa)
128                 memcpy(oa, &body->oa, sizeof(*oa));
129
130         EXIT;
131  out:
132         ptlrpc_free_req(request);
133         return rc;
134 }
135
136 static int osc_setattr(struct lustre_handle *conn, struct obdo *oa,
137                        struct lov_stripe_md *md)
138 {
139         struct ptlrpc_request *request;
140         struct ost_body *body;
141         int rc, size = sizeof(*body);
142         ENTRY;
143
144         request = ptlrpc_prep_req2(conn, OST_SETATTR, 1, &size, NULL);
145         if (!request)
146                 RETURN(-ENOMEM);
147
148         body = lustre_msg_buf(request->rq_reqmsg, 0);
149         memcpy(&body->oa, oa, sizeof(*oa));
150
151         request->rq_replen = lustre_msg_size(1, &size);
152
153         rc = ptlrpc_queue_wait(request);
154         rc = ptlrpc_check_status(request, rc);
155         GOTO(out, rc);
156
157  out:
158         ptlrpc_free_req(request);
159         return rc;
160 }
161
162 static int osc_create(struct lustre_handle *conn, struct obdo *oa,
163                       struct lov_stripe_md **ea)
164 {
165         struct ptlrpc_request *request;
166         struct ost_body *body;
167         int rc, size = sizeof(*body);
168         ENTRY;
169
170         if (!oa) {
171                 CERROR("oa NULL\n");
172                 RETURN(-EINVAL);
173         }
174
175         if (!ea) {
176                 LBUG();
177         }
178
179         if (!*ea) {
180                 OBD_ALLOC(*ea, oa->o_easize);
181                 if (!*ea)
182                         RETURN(-ENOMEM);
183                 (*ea)->lmd_easize = oa->o_easize;
184         }
185
186         request = ptlrpc_prep_req2(conn, OST_CREATE, 1, &size, NULL);
187         if (!request)
188                 RETURN(-ENOMEM);
189
190         body = lustre_msg_buf(request->rq_reqmsg, 0);
191         memcpy(&body->oa, oa, sizeof(*oa));
192
193         request->rq_replen = lustre_msg_size(1, &size);
194
195         rc = ptlrpc_queue_wait(request);
196         rc = ptlrpc_check_status(request, rc);
197         if (rc)
198                 GOTO(out, rc);
199
200         body = lustre_msg_buf(request->rq_repmsg, 0);
201         memcpy(oa, &body->oa, sizeof(*oa));
202
203         (*ea)->lmd_object_id = oa->o_id;
204         (*ea)->lmd_stripe_count = 1;
205         EXIT;
206  out:
207         ptlrpc_free_req(request);
208         return rc;
209 }
210
211 static int osc_punch(struct lustre_handle *conn, struct obdo *oa,
212                      struct lov_stripe_md *md, obd_size start,
213                      obd_size end)
214 {
215         struct ptlrpc_request *request;
216         struct ost_body *body;
217         int rc, size = sizeof(*body);
218         ENTRY;
219
220         if (!oa) {
221                 CERROR("oa NULL\n");
222                 RETURN(-EINVAL);
223         }
224
225         request = ptlrpc_prep_req2(conn, OST_PUNCH, 1, &size, NULL);
226         if (!request)
227                 RETURN(-ENOMEM);
228
229         body = lustre_msg_buf(request->rq_reqmsg, 0);
230 #warning FIXME: pack only valid fields instead of memcpy, endianness, valid
231         memcpy(&body->oa, oa, sizeof(*oa));
232
233         /* overload the blocks and size fields in the oa with start/end */ 
234 #warning FIXME: endianness, size=start, blocks=end?
235         body->oa.o_blocks = start;
236         body->oa.o_size = end;
237         body->oa.o_valid |= OBD_MD_FLBLOCKS | OBD_MD_FLSIZE;
238
239         request->rq_replen = lustre_msg_size(1, &size);
240
241         rc = ptlrpc_queue_wait(request);
242         rc = ptlrpc_check_status(request, rc);
243         if (rc)
244                 GOTO(out, rc);
245
246         body = lustre_msg_buf(request->rq_repmsg, 0);
247         memcpy(oa, &body->oa, sizeof(*oa));
248
249         EXIT;
250  out:
251         ptlrpc_free_req(request);
252         return rc;
253 }
254
255 static int osc_destroy(struct lustre_handle *conn, struct obdo *oa,
256                        struct lov_stripe_md *ea)
257 {
258         struct ptlrpc_request *request;
259         struct ost_body *body;
260         int rc, size = sizeof(*body);
261         ENTRY;
262
263         if (!oa) {
264                 CERROR("oa NULL\n");
265                 RETURN(-EINVAL);
266         }
267         request = ptlrpc_prep_req2(conn, OST_DESTROY, 1, &size, NULL);
268         if (!request)
269                 RETURN(-ENOMEM);
270
271         body = lustre_msg_buf(request->rq_reqmsg, 0);
272 #warning FIXME: pack only valid fields instead of memcpy, endianness
273         memcpy(&body->oa, oa, sizeof(*oa));
274
275         request->rq_replen = lustre_msg_size(1, &size);
276
277         rc = ptlrpc_queue_wait(request);
278         rc = ptlrpc_check_status(request, rc);
279         if (rc)
280                 GOTO(out, rc);
281
282         body = lustre_msg_buf(request->rq_repmsg, 0);
283         memcpy(oa, &body->oa, sizeof(*oa));
284
285         EXIT;
286  out:
287         ptlrpc_free_req(request);
288         return rc;
289 }
290
291 struct osc_brw_cb_data {
292         brw_callback_t callback;
293         void *cb_data;
294         void *obd_data;
295         size_t obd_size;
296 };
297
298 /* Our bulk-unmapping bottom half. */
299 static void unmap_and_decref_bulk_desc(void *data)
300 {
301         struct ptlrpc_bulk_desc *desc = data;
302         struct list_head *tmp;
303         ENTRY;
304
305         /* This feels wrong to me. */
306         list_for_each(tmp, &desc->b_page_list) {
307                 struct ptlrpc_bulk_page *bulk;
308                 bulk = list_entry(tmp, struct ptlrpc_bulk_page, b_link);
309
310                 kunmap(bulk->b_page);
311         }
312
313         ptlrpc_bulk_decref(desc);
314         EXIT;
315 }
316
317 static void brw_finish(struct ptlrpc_bulk_desc *desc, void *data)
318 {
319         struct osc_brw_cb_data *cb_data = data;
320         int err = 0;
321         ENTRY;
322
323         if (desc->b_flags & PTL_RPC_FL_TIMEOUT) {
324                 err = (desc->b_flags & PTL_RPC_FL_INTR ? -ERESTARTSYS : 
325                        -ETIMEDOUT);
326         }
327
328         if (cb_data->callback)
329                 cb_data->callback(cb_data->cb_data, err, CB_PHASE_FINISH);
330
331         OBD_FREE(cb_data->obd_data, cb_data->obd_size);
332         OBD_FREE(cb_data, sizeof(*cb_data));
333
334         /* We can't kunmap the desc from interrupt context, so we do it from
335          * the bottom half above. */
336         INIT_TQUEUE(&desc->b_queue, 0, 0);
337         PREPARE_TQUEUE(&desc->b_queue, unmap_and_decref_bulk_desc, desc);
338         schedule_task(&desc->b_queue);
339
340         EXIT;
341 }
342
343 static int osc_brw_read(struct lustre_handle *conn, struct lov_stripe_md *md,
344                         obd_count page_count, struct brw_page *pga,
345                         brw_callback_t callback, struct io_cb_data *data)
346 {
347         struct ptlrpc_connection *connection = client_conn2cli(conn)->cl_conn;
348         struct ptlrpc_request *request = NULL;
349         struct ptlrpc_bulk_desc *desc = NULL;
350         struct ost_body *body;
351         struct osc_brw_cb_data *cb_data = NULL;
352         int rc, size[3] = {sizeof(*body)};
353         void *iooptr, *nioptr;
354         int mapped = 0;
355         __u32 xid;
356         ENTRY;
357
358         size[1] = sizeof(struct obd_ioobj);
359         size[2] = page_count * sizeof(struct niobuf_remote);
360
361         request = ptlrpc_prep_req2(conn, OST_READ, 3, size, NULL);
362         if (!request)
363                 RETURN(-ENOMEM);
364
365         body = lustre_msg_buf(request->rq_reqmsg, 0);
366
367         desc = ptlrpc_prep_bulk(connection);
368         if (!desc)
369                 GOTO(out_req, rc = -ENOMEM);
370         desc->b_portal = OST_BULK_PORTAL;
371         desc->b_cb = brw_finish;
372         OBD_ALLOC(cb_data, sizeof(*cb_data));
373         if (!cb_data)
374                 GOTO(out_desc, rc = -ENOMEM);
375
376         cb_data->callback = callback;
377         cb_data->cb_data = data;
378         data->desc = desc;
379         desc->b_cb_data = cb_data;
380
381         iooptr = lustre_msg_buf(request->rq_reqmsg, 1);
382         nioptr = lustre_msg_buf(request->rq_reqmsg, 2);
383         ost_pack_ioo(&iooptr, md, page_count);
384         /* end almost identical to brw_write case */
385
386         spin_lock(&connection->c_lock);
387         xid = ++connection->c_xid_out;       /* single xid for all pages */
388         spin_unlock(&connection->c_lock);
389
390         for (mapped = 0; mapped < page_count; mapped++) {
391                 struct ptlrpc_bulk_page *bulk = ptlrpc_prep_bulk_page(desc);
392                 if (bulk == NULL)
393                         GOTO(out_unmap, rc = -ENOMEM);
394
395                 bulk->b_xid = xid;           /* single xid for all pages */
396
397                 bulk->b_buf = kmap(pga[mapped].pg);
398                 bulk->b_page = pga[mapped].pg;
399                 bulk->b_buflen = PAGE_SIZE;
400                 ost_pack_niobuf(&nioptr, pga[mapped].off, pga[mapped].count,
401                                 pga[mapped].flag, bulk->b_xid);
402         }
403
404         /*
405          * Register the bulk first, because the reply could arrive out of order,
406          * and we want to be ready for the bulk data.
407          *
408          * The reference is released when brw_finish is complete.
409          *
410          * On error, we never do the brw_finish, so we handle all decrefs.
411          */
412         if (OBD_FAIL_CHECK(OBD_FAIL_OSC_BRW_READ_BULK)) {
413                 CERROR("obd_fail_loc=%x, skipping register_bulk\n",
414                        OBD_FAIL_OSC_BRW_READ_BULK);
415         } else {
416                 rc = ptlrpc_register_bulk(desc);
417                 if (rc)
418                         GOTO(out_unmap, rc);
419         }
420
421         request->rq_replen = lustre_msg_size(1, size);
422         rc = ptlrpc_queue_wait(request);
423         rc = ptlrpc_check_status(request, rc);
424
425         /*
426          * XXX: If there is an error during the processing of the callback,
427          *      such as a timeout in a sleep that it performs, brw_finish
428          *      will never get called, and we'll leak the desc, fail to kunmap
429          *      things, cats will live with dogs.  One solution would be to
430          *      export brw_finish as osc_brw_finish, so that the timeout case and
431          *      its kin could call it for proper cleanup.  An alternative would
432          *      be for an error return from the callback to cause us to clean up,
433          *      but that doesn't help the truly async cases (like LOV), which
434          *      will immediately return from their PHASE_START callback, before
435          *      any such cleanup-requiring error condition can be detected.
436          */
437         if (rc)
438                 GOTO(out_req, rc);
439
440         /* Callbacks cause asynchronous handling. */
441         rc = callback(data, 0, CB_PHASE_START); 
442
443 out_req:
444         ptlrpc_req_finished(request);
445         RETURN(rc);
446
447         /* Clean up on error. */
448 out_unmap:
449         while (mapped-- > 0)
450                 kunmap(pga[mapped].pg);
451         OBD_FREE(cb_data, sizeof(*cb_data));
452 out_desc:
453         ptlrpc_bulk_decref(desc);
454         goto out_req;
455 }
456
457 static int osc_brw_write(struct lustre_handle *conn, struct lov_stripe_md *md,
458                          obd_count page_count, struct brw_page *pga,
459                          brw_callback_t callback, struct io_cb_data *data)
460 {
461         struct ptlrpc_connection *connection = client_conn2cli(conn)->cl_conn;
462         struct ptlrpc_request *request = NULL;
463         struct ptlrpc_bulk_desc *desc = NULL;
464         struct ost_body *body;
465         struct niobuf_local *local = NULL;
466         struct niobuf_remote *remote;
467         struct osc_brw_cb_data *cb_data = NULL;
468         int rc, j, size[3] = {sizeof(*body)};
469         void *iooptr, *nioptr;
470         int mapped = 0;
471         ENTRY;
472
473         size[1] = sizeof(struct obd_ioobj);
474         size[2] = page_count * sizeof(*remote);
475
476         request = ptlrpc_prep_req2(conn, OST_WRITE, 3, size, NULL);
477         if (!request)
478                 RETURN(-ENOMEM);
479
480         body = lustre_msg_buf(request->rq_reqmsg, 0);
481
482         desc = ptlrpc_prep_bulk(connection);
483         if (!desc)
484                 GOTO(out_req, rc = -ENOMEM);
485         desc->b_portal = OSC_BULK_PORTAL;
486         desc->b_cb = brw_finish;
487         OBD_ALLOC(cb_data, sizeof(*cb_data));
488         if (!cb_data)
489                 GOTO(out_desc, rc = -ENOMEM);
490
491         cb_data->callback = callback;
492         cb_data->cb_data = data;
493         data->desc = desc;
494         desc->b_cb_data = cb_data;
495
496         iooptr = lustre_msg_buf(request->rq_reqmsg, 1);
497         nioptr = lustre_msg_buf(request->rq_reqmsg, 2);
498         ost_pack_ioo(&iooptr, md, page_count);
499         /* end almost identical to brw_read case */
500
501         OBD_ALLOC(local, page_count * sizeof(*local));
502         if (!local)
503                 GOTO(out_cb, rc = -ENOMEM);
504
505         cb_data->obd_data = local;
506         cb_data->obd_size = page_count * sizeof(*local);
507
508         for (mapped = 0; mapped < page_count; mapped++) {
509                 local[mapped].addr = kmap(pga[mapped].pg);
510                 local[mapped].offset = pga[mapped].off;
511                 local[mapped].len = pga[mapped].count;
512                 ost_pack_niobuf(&nioptr, pga[mapped].off, pga[mapped].count,
513                                 pga[mapped].flag, 0);
514         }
515
516         size[1] = page_count * sizeof(*remote);
517         request->rq_replen = lustre_msg_size(2, size);
518         rc = ptlrpc_queue_wait(request);
519         rc = ptlrpc_check_status(request, rc);
520         if (rc)
521                 GOTO(out_unmap, rc);
522
523         nioptr = lustre_msg_buf(request->rq_repmsg, 1);
524         if (!nioptr)
525                 GOTO(out_unmap, rc = -EINVAL);
526
527         if (request->rq_repmsg->buflens[1] != size[1]) {
528                 CERROR("buffer length wrong (%d vs. %d)\n",
529                        request->rq_repmsg->buflens[1], size[1]);
530                 GOTO(out_unmap, rc = -EINVAL);
531         }
532
533         for (j = 0; j < page_count; j++) {
534                 struct ptlrpc_bulk_page *bulk;
535
536                 ost_unpack_niobuf(&nioptr, &remote);
537
538                 bulk = ptlrpc_prep_bulk_page(desc);
539                 if (!bulk)
540                         GOTO(out_unmap, rc = -ENOMEM);
541
542                 bulk->b_buf = (void *)(unsigned long)local[j].addr;
543                 bulk->b_buflen = local[j].len;
544                 bulk->b_xid = remote->xid;
545                 bulk->b_page = pga[j].pg;
546         }
547
548         if (desc->b_page_count != page_count)
549                 LBUG();
550
551         if (OBD_FAIL_CHECK(OBD_FAIL_OSC_BRW_WRITE_BULK))
552                 GOTO(out_unmap, rc = 0);
553
554         /* Our reference is released when brw_finish is complete. */
555         rc = ptlrpc_send_bulk(desc);
556
557         /* XXX: Mike, same question as in osc_brw_read. */
558         if (rc)
559                 GOTO(out_req, rc);
560
561         /* Callbacks cause asynchronous handling. */
562         rc = callback(data, 0, CB_PHASE_START);
563
564 out_req:
565         ptlrpc_req_finished(request);
566         RETURN(rc);
567
568         /* Clean up on error. */
569 out_unmap:
570         while (mapped-- > 0)
571                 kunmap(pga[mapped].pg);
572
573         OBD_FREE(local, page_count * sizeof(*local));
574 out_cb:
575         OBD_FREE(cb_data, sizeof(*cb_data));
576 out_desc:
577         ptlrpc_bulk_decref(desc);
578         goto out_req;
579 }
580
581 static int osc_brw(int cmd, struct lustre_handle *conn,
582                    struct lov_stripe_md *md, obd_count page_count,
583                    struct brw_page *pga, brw_callback_t callback,
584                    struct io_cb_data *data)
585 {
586         if (cmd & OBD_BRW_WRITE)
587                 return osc_brw_write(conn, md, page_count, pga, callback, data);
588         else
589                 return osc_brw_read(conn, md, page_count, pga, callback, data);
590 }
591
592 static int osc_enqueue(struct lustre_handle *connh, struct lov_stripe_md *md, 
593                        struct lustre_handle *parent_lock, 
594                        __u32 type, void *extentp, int extent_len, __u32 mode,
595                        int *flags, void *callback, void *data, int datalen,
596                        struct lustre_handle *lockh)
597 {
598         __u64 res_id[RES_NAME_SIZE] = { md->lmd_object_id };
599         struct obd_device *obddev = class_conn2obd(connh);
600         struct ldlm_extent *extent = extentp;
601         int rc;
602         __u32 mode2;
603
604         /* Filesystem locks are given a bit of special treatment: first we
605          * fixup the lock to start and end on page boundaries. */
606         extent->start &= PAGE_MASK;
607         extent->end = (extent->end + PAGE_SIZE - 1) & PAGE_MASK;
608
609         /* Next, search for already existing extent locks that will cover us */
610         //osc_con2dlmcl(conn, &cl, &connection, &rconn);
611         rc = ldlm_lock_match(obddev->obd_namespace, res_id, type, extent,
612                              sizeof(extent), mode, lockh);
613         if (rc == 1) {
614                 /* We already have a lock, and it's referenced */
615                 return 0;
616         }
617
618         /* Next, search for locks that we can upgrade (if we're trying to write)
619          * or are more than we need (if we're trying to read).  Because the VFS
620          * and page cache already protect us locally, lots of readers/writers
621          * can share a single PW lock. */
622         if (mode == LCK_PW)
623                 mode2 = LCK_PR;
624         else
625                 mode2 = LCK_PW;
626
627         rc = ldlm_lock_match(obddev->obd_namespace, res_id, type, extent,
628                              sizeof(extent), mode2, lockh);
629         if (rc == 1) {
630                 int flags;
631                 /* FIXME: This is not incredibly elegant, but it might
632                  * be more elegant than adding another parameter to
633                  * lock_match.  I want a second opinion. */
634                 ldlm_lock_addref(lockh, mode);
635                 ldlm_lock_decref(lockh, mode2);
636
637                 if (mode == LCK_PR)
638                         return 0;
639
640                 rc = ldlm_cli_convert(lockh, mode, &flags);
641                 if (rc)
642                         LBUG();
643
644                 return rc;
645         }
646
647         rc = ldlm_cli_enqueue(connh, NULL, obddev->obd_namespace,
648                               parent_lock, res_id, type, extent,
649                               sizeof(extent), mode, flags, ldlm_completion_ast,
650                               callback, data, datalen, lockh);
651         return rc;
652 }
653
654 static int osc_cancel(struct lustre_handle *oconn, struct lov_stripe_md *md,
655                       __u32 mode, struct lustre_handle *lockh)
656 {
657         ENTRY;
658
659         ldlm_lock_decref(lockh, mode);
660
661         RETURN(0);
662 }
663
664 static int osc_statfs(struct lustre_handle *conn, struct statfs *sfs)
665 {
666         struct ptlrpc_request *request;
667         struct obd_statfs *osfs;
668         int rc, size = sizeof(*osfs);
669         ENTRY;
670
671         request = ptlrpc_prep_req2(conn, OST_STATFS, 0, NULL, NULL);
672         if (!request)
673                 RETURN(-ENOMEM);
674
675         request->rq_replen = lustre_msg_size(1, &size);
676
677         rc = ptlrpc_queue_wait(request);
678         rc = ptlrpc_check_status(request, rc);
679         if (rc) {
680                 CERROR("%s failed: rc = %d\n", __FUNCTION__, rc);
681                 GOTO(out, rc);
682         }
683
684         osfs = lustre_msg_buf(request->rq_repmsg, 0);
685         obd_statfs_unpack(osfs, sfs);
686
687         EXIT;
688  out:
689         ptlrpc_free_req(request);
690         return rc;
691 }
692
693 static int osc_iocontrol(long cmd, struct lustre_handle *conn, int len,
694                          void *karg, void *uarg)
695 {
696         struct obd_device *obddev = class_conn2obd(conn);
697         struct obd_ioctl_data *data = karg;
698         int err = 0;
699         ENTRY;
700
701         if (_IOC_TYPE(cmd) != IOC_LDLM_TYPE || _IOC_NR(cmd) < 
702                         IOC_LDLM_MIN_NR || _IOC_NR(cmd) > IOC_LDLM_MAX_NR) {
703                 CDEBUG(D_IOCTL, "invalid ioctl (type %ld, nr %ld, size %ld)\n",
704                         _IOC_TYPE(cmd), _IOC_NR(cmd), _IOC_SIZE(cmd));
705                 RETURN(-EINVAL);
706         }
707
708         switch (cmd) {
709         case IOC_LDLM_TEST: {
710                 err = ldlm_test(obddev, conn);
711                 CERROR("-- done err %d\n", err);
712                 GOTO(out, err);
713         }
714         case IOC_LDLM_REGRESS_START: {
715                 unsigned int numthreads; 
716                 
717                 if (data->ioc_inllen1) 
718                         numthreads = simple_strtoul(data->ioc_inlbuf1, NULL, 0);
719                 else 
720                         numthreads = 1;
721
722                 err = ldlm_regression_start(obddev, conn, numthreads);
723                 CERROR("-- done err %d\n", err);
724                 GOTO(out, err);
725         }
726         case IOC_LDLM_REGRESS_STOP: {
727                 err = ldlm_regression_stop();
728                 CERROR("-- done err %d\n", err);
729                 GOTO(out, err);
730         }
731         default:
732                 GOTO(out, err = -EINVAL);
733         }
734 out:
735         return err;
736 }
737
738 struct obd_ops osc_obd_ops = {
739         o_setup:        client_obd_setup,
740         o_cleanup:      client_obd_cleanup,
741         o_statfs:       osc_statfs,
742         o_create:       osc_create,
743         o_destroy:      osc_destroy,
744         o_getattr:      osc_getattr,
745         o_setattr:      osc_setattr,
746         o_open:         osc_open,
747         o_close:        osc_close,
748         o_connect:      client_obd_connect,
749         o_disconnect:   client_obd_disconnect,
750         o_brw:          osc_brw,
751         o_punch:        osc_punch,
752         o_enqueue:      osc_enqueue,
753         o_cancel:       osc_cancel,
754         o_iocontrol:    osc_iocontrol
755 };
756
757 static int __init osc_init(void)
758 {
759         return class_register_type(&osc_obd_ops, LUSTRE_OSC_NAME);
760 }
761
762 static void __exit osc_exit(void)
763 {
764         class_unregister_type(LUSTRE_OSC_NAME);
765 }
766
767 MODULE_AUTHOR("Cluster File Systems, Inc. <info@clusterfs.com>");
768 MODULE_DESCRIPTION("Lustre Object Storage Client (OSC) v1.0");
769 MODULE_LICENSE("GPL");
770
771 module_init(osc_init);
772 module_exit(osc_exit);