Whamcloud - gitweb
Include version.h so we can check version.
[fs/lustre-release.git] / lustre / osc / osc_request.c
1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2  * vim:expandtab:shiftwidth=8:tabstop=8:
3  *
4  * Copyright (C) 2001, 2002 Cluster File Systems, Inc.
5  *
6  *  This code is issued under the GNU General Public License.
7  *  See the file COPYING in this distribution
8  *
9  *  Author Peter Braam <braam@clusterfs.com>
10  *
11  *  This server is single threaded at present (but can easily be multi
12  *  threaded). For testing and management it is treated as an
13  *  obd_device, although it does not export a full OBD method table
14  *  (the requests are coming in over the wire, so object target
15  *  modules do not have a full method table.)
16  *
17  */
18
19 #define EXPORT_SYMTAB
20 #define DEBUG_SUBSYSTEM S_OSC
21
22 #include <linux/version.h>
23 #include <linux/module.h>
24 #include <linux/mm.h>
25 #include <linux/highmem.h>
26 #include <linux/lustre_dlm.h>
27 #if (LINUX_VERSION_CODE >= KERNEL_VERSION(2,5,0))
28 #include <linux/workqueue.h>
29 #endif
30 #include <linux/kp30.h>
31 #include <linux/lustre_mds.h> /* for mds_objid */
32 #include <linux/obd_ost.h>
33 #include <linux/obd_lov.h>
34 #include <linux/ctype.h>
35 #include <linux/init.h>
36 #include <linux/lustre_ha.h>
37 #include <linux/obd_support.h> /* for OBD_FAIL_CHECK */
38 #include <linux/lustre_lite.h> /* for ll_i2info */
39
40 static int osc_getattr(struct lustre_handle *conn, struct obdo *oa,
41                        struct lov_stripe_md *md)
42 {
43         struct ptlrpc_request *request;
44         struct ost_body *body;
45         int rc, size = sizeof(*body);
46         ENTRY;
47
48         request = ptlrpc_prep_req(class_conn2cliimp(conn), OST_GETATTR, 1,
49                                   &size, NULL);
50         if (!request)
51                 RETURN(-ENOMEM);
52
53         body = lustre_msg_buf(request->rq_reqmsg, 0);
54 #warning FIXME: pack only valid fields instead of memcpy, endianness
55         memcpy(&body->oa, oa, sizeof(*oa));
56
57         request->rq_replen = lustre_msg_size(1, &size);
58
59         rc = ptlrpc_queue_wait(request);
60         rc = ptlrpc_check_status(request, rc);
61         if (rc) {
62                 CERROR("%s failed: rc = %d\n", __FUNCTION__, rc);
63                 GOTO(out, rc);
64         }
65
66         body = lustre_msg_buf(request->rq_repmsg, 0);
67         CDEBUG(D_INODE, "mode: %o\n", body->oa.o_mode);
68         if (oa)
69                 memcpy(oa, &body->oa, sizeof(*oa));
70
71         EXIT;
72  out:
73         ptlrpc_req_finished(request);
74         return rc;
75 }
76
77 static int osc_open(struct lustre_handle *conn, struct obdo *oa,
78                     struct lov_stripe_md *md)
79 {
80         struct ptlrpc_request *request;
81         struct ost_body *body;
82         int rc, size = sizeof(*body);
83         ENTRY;
84
85         request = ptlrpc_prep_req(class_conn2cliimp(conn), OST_OPEN, 1, &size,
86                                   NULL);
87         if (!request)
88                 RETURN(-ENOMEM);
89
90         body = lustre_msg_buf(request->rq_reqmsg, 0);
91 #warning FIXME: pack only valid fields instead of memcpy, endianness
92         memcpy(&body->oa, oa, sizeof(*oa));
93
94         request->rq_replen = lustre_msg_size(1, &size);
95
96         rc = ptlrpc_queue_wait(request);
97         rc = ptlrpc_check_status(request, rc);
98         if (rc)
99                 GOTO(out, rc);
100
101         body = lustre_msg_buf(request->rq_repmsg, 0);
102         CDEBUG(D_INODE, "mode: %o\n", body->oa.o_mode);
103         if (oa)
104                 memcpy(oa, &body->oa, sizeof(*oa));
105
106         EXIT;
107  out:
108         ptlrpc_req_finished(request);
109         return rc;
110 }
111
112 static int osc_close(struct lustre_handle *conn, struct obdo *oa,
113                      struct lov_stripe_md *md)
114 {
115         struct ptlrpc_request *request;
116         struct ost_body *body;
117         int rc, size = sizeof(*body);
118         ENTRY;
119
120         request = ptlrpc_prep_req(class_conn2cliimp(conn), OST_CLOSE, 1, &size,
121                                   NULL);
122         if (!request)
123                 RETURN(-ENOMEM);
124
125         body = lustre_msg_buf(request->rq_reqmsg, 0);
126 #warning FIXME: pack only valid fields instead of memcpy, endianness
127         memcpy(&body->oa, oa, sizeof(*oa));
128
129         request->rq_replen = lustre_msg_size(1, &size);
130
131         rc = ptlrpc_queue_wait(request);
132         rc = ptlrpc_check_status(request, rc);
133         if (rc)
134                 GOTO(out, rc);
135
136         body = lustre_msg_buf(request->rq_repmsg, 0);
137         CDEBUG(D_INODE, "mode: %o\n", body->oa.o_mode);
138         if (oa)
139                 memcpy(oa, &body->oa, sizeof(*oa));
140
141         EXIT;
142  out:
143         ptlrpc_req_finished(request);
144         return rc;
145 }
146
147 static int osc_setattr(struct lustre_handle *conn, struct obdo *oa,
148                        struct lov_stripe_md *md)
149 {
150         struct ptlrpc_request *request;
151         struct ost_body *body;
152         int rc, size = sizeof(*body);
153         ENTRY;
154
155         request = ptlrpc_prep_req(class_conn2cliimp(conn), OST_SETATTR, 1,
156                                   &size, NULL);
157         if (!request)
158                 RETURN(-ENOMEM);
159
160         body = lustre_msg_buf(request->rq_reqmsg, 0);
161         memcpy(&body->oa, oa, sizeof(*oa));
162
163         request->rq_replen = lustre_msg_size(1, &size);
164
165         rc = ptlrpc_queue_wait(request);
166         rc = ptlrpc_check_status(request, rc);
167
168         ptlrpc_req_finished(request);
169         return rc;
170 }
171
172 static int osc_create(struct lustre_handle *conn, struct obdo *oa,
173                       struct lov_stripe_md **ea)
174 {
175         struct ptlrpc_request *request;
176         struct ost_body *body;
177         struct lov_stripe_md *lsm;
178         int rc, size = sizeof(*body);
179         ENTRY;
180
181         LASSERT(oa);
182         LASSERT(ea);
183
184         lsm = *ea;
185         if (!lsm) {
186                 // XXX check oa->o_valid & OBD_MD_FLEASIZE first...
187                 OBD_ALLOC(lsm, oa->o_easize);
188                 if (!lsm)
189                         RETURN(-ENOMEM);
190                 lsm->lsm_mds_easize = oa->o_easize;
191         }
192
193         request = ptlrpc_prep_req(class_conn2cliimp(conn), OST_CREATE, 1, &size,
194                                   NULL);
195         if (!request)
196                 GOTO(out, rc = -ENOMEM);
197
198         body = lustre_msg_buf(request->rq_reqmsg, 0);
199         memcpy(&body->oa, oa, sizeof(*oa));
200
201         request->rq_replen = lustre_msg_size(1, &size);
202
203         rc = ptlrpc_queue_wait(request);
204         rc = ptlrpc_check_status(request, rc);
205         if (rc)
206                 GOTO(out_req, rc);
207
208         body = lustre_msg_buf(request->rq_repmsg, 0);
209         memcpy(oa, &body->oa, sizeof(*oa));
210
211         lsm->lsm_object_id = oa->o_id;
212         lsm->lsm_stripe_count = 0;
213         *ea = lsm;
214         EXIT;
215 out_req:
216         ptlrpc_req_finished(request);
217 out:
218         if (rc && !*ea)
219                 OBD_FREE(lsm, oa->o_easize);
220         return rc;
221 }
222
223 static int osc_punch(struct lustre_handle *conn, struct obdo *oa,
224                      struct lov_stripe_md *md, obd_size start,
225                      obd_size end)
226 {
227         struct ptlrpc_request *request;
228         struct ost_body *body;
229         int rc, size = sizeof(*body);
230         ENTRY;
231
232         if (!oa) {
233                 CERROR("oa NULL\n");
234                 RETURN(-EINVAL);
235         }
236
237         request = ptlrpc_prep_req(class_conn2cliimp(conn), OST_PUNCH, 1, &size,
238                                   NULL);
239         if (!request)
240                 RETURN(-ENOMEM);
241
242         body = lustre_msg_buf(request->rq_reqmsg, 0);
243 #warning FIXME: pack only valid fields instead of memcpy, endianness, valid
244         memcpy(&body->oa, oa, sizeof(*oa));
245
246         /* overload the blocks and size fields in the oa with start/end */
247 #warning FIXME: endianness, size=start, blocks=end?
248         body->oa.o_blocks = start;
249         body->oa.o_size = end;
250         body->oa.o_valid |= OBD_MD_FLBLOCKS | OBD_MD_FLSIZE;
251
252         request->rq_replen = lustre_msg_size(1, &size);
253
254         rc = ptlrpc_queue_wait(request);
255         rc = ptlrpc_check_status(request, rc);
256         if (rc)
257                 GOTO(out, rc);
258
259         body = lustre_msg_buf(request->rq_repmsg, 0);
260         memcpy(oa, &body->oa, sizeof(*oa));
261
262         EXIT;
263  out:
264         ptlrpc_req_finished(request);
265         return rc;
266 }
267
268 static int osc_destroy(struct lustre_handle *conn, struct obdo *oa,
269                        struct lov_stripe_md *ea)
270 {
271         struct ptlrpc_request *request;
272         struct ost_body *body;
273         int rc, size = sizeof(*body);
274         ENTRY;
275
276         if (!oa) {
277                 CERROR("oa NULL\n");
278                 RETURN(-EINVAL);
279         }
280         request = ptlrpc_prep_req(class_conn2cliimp(conn), OST_DESTROY, 1,
281                                   &size, NULL);
282         if (!request)
283                 RETURN(-ENOMEM);
284
285         body = lustre_msg_buf(request->rq_reqmsg, 0);
286 #warning FIXME: pack only valid fields instead of memcpy, endianness
287         memcpy(&body->oa, oa, sizeof(*oa));
288
289         request->rq_replen = lustre_msg_size(1, &size);
290
291         rc = ptlrpc_queue_wait(request);
292         rc = ptlrpc_check_status(request, rc);
293         if (rc)
294                 GOTO(out, rc);
295
296         body = lustre_msg_buf(request->rq_repmsg, 0);
297         memcpy(oa, &body->oa, sizeof(*oa));
298
299         EXIT;
300  out:
301         ptlrpc_req_finished(request);
302         return rc;
303 }
304
305 struct osc_brw_cb_data {
306         brw_callback_t callback;
307         void *cb_data;
308         void *obd_data;
309         size_t obd_size;
310 };
311
312 /* Our bulk-unmapping bottom half. */
313 static void unmap_and_decref_bulk_desc(void *data)
314 {
315         struct ptlrpc_bulk_desc *desc = data;
316         struct list_head *tmp;
317         ENTRY;
318
319         /* This feels wrong to me. */
320         list_for_each(tmp, &desc->bd_page_list) {
321                 struct ptlrpc_bulk_page *bulk;
322                 bulk = list_entry(tmp, struct ptlrpc_bulk_page, bp_link);
323
324                 kunmap(bulk->bp_page);
325         }
326
327         ptlrpc_bulk_decref(desc);
328         EXIT;
329 }
330
331 static void brw_finish(struct ptlrpc_bulk_desc *desc, void *data)
332 {
333         struct osc_brw_cb_data *cb_data = data;
334         int err = 0;
335         ENTRY;
336
337         if (desc->bd_flags & PTL_RPC_FL_TIMEOUT) {
338                 err = (desc->bd_flags & PTL_RPC_FL_INTR ? -ERESTARTSYS :
339                        -ETIMEDOUT);
340         }
341
342         if (cb_data->callback)
343                 cb_data->callback(cb_data->cb_data, err, CB_PHASE_FINISH);
344
345         if (cb_data->obd_data)
346                 OBD_FREE(cb_data->obd_data, cb_data->obd_size);
347         OBD_FREE(cb_data, sizeof(*cb_data));
348
349         /* We can't kunmap the desc from interrupt context, so we do it from
350          * the bottom half above. */
351         prepare_work(&desc->bd_queue, unmap_and_decref_bulk_desc, desc);
352         schedule_work(&desc->bd_queue);
353
354         EXIT;
355 }
356
357 static int osc_brw_read(struct lustre_handle *conn, struct lov_stripe_md *lsm,
358                         obd_count page_count, struct brw_page *pga,
359                         brw_callback_t callback, struct io_cb_data *data)
360 {
361         struct ptlrpc_connection *connection =
362                 client_conn2cli(conn)->cl_import.imp_connection;
363         struct ptlrpc_request *request = NULL;
364         struct ptlrpc_bulk_desc *desc = NULL;
365         struct ost_body *body;
366         struct osc_brw_cb_data *cb_data = NULL;
367         int rc, size[3] = {sizeof(*body)};
368         void *iooptr, *nioptr;
369         int mapped = 0;
370         __u32 xid;
371         ENTRY;
372
373         size[1] = sizeof(struct obd_ioobj);
374         size[2] = page_count * sizeof(struct niobuf_remote);
375
376         request = ptlrpc_prep_req(class_conn2cliimp(conn), OST_READ, 3, size,
377                                   NULL);
378         if (!request)
379                 RETURN(-ENOMEM);
380
381         body = lustre_msg_buf(request->rq_reqmsg, 0);
382
383         desc = ptlrpc_prep_bulk(connection);
384         if (!desc)
385                 GOTO(out_req, rc = -ENOMEM);
386         desc->bd_portal = OST_BULK_PORTAL;
387         desc->bd_cb = brw_finish;
388         OBD_ALLOC(cb_data, sizeof(*cb_data));
389         if (!cb_data)
390                 GOTO(out_desc, rc = -ENOMEM);
391
392         cb_data->callback = callback;
393         cb_data->cb_data = data;
394         data->desc = desc;
395         desc->bd_cb_data = cb_data;
396
397         iooptr = lustre_msg_buf(request->rq_reqmsg, 1);
398         nioptr = lustre_msg_buf(request->rq_reqmsg, 2);
399         ost_pack_ioo(&iooptr, lsm, page_count);
400         /* end almost identical to brw_write case */
401
402         spin_lock(&connection->c_lock);
403         xid = ++connection->c_xid_out;       /* single xid for all pages */
404         spin_unlock(&connection->c_lock);
405
406         for (mapped = 0; mapped < page_count; mapped++) {
407                 struct ptlrpc_bulk_page *bulk = ptlrpc_prep_bulk_page(desc);
408                 if (bulk == NULL)
409                         GOTO(out_unmap, rc = -ENOMEM);
410
411                 bulk->bp_xid = xid;           /* single xid for all pages */
412
413                 bulk->bp_buf = kmap(pga[mapped].pg);
414                 bulk->bp_page = pga[mapped].pg;
415                 bulk->bp_buflen = PAGE_SIZE;
416                 ost_pack_niobuf(&nioptr, pga[mapped].off, pga[mapped].count,
417                                 pga[mapped].flag, bulk->bp_xid);
418         }
419
420         /*
421          * Register the bulk first, because the reply could arrive out of order,
422          * and we want to be ready for the bulk data.
423          *
424          * The reference is released when brw_finish is complete.
425          *
426          * On error, we never do the brw_finish, so we handle all decrefs.
427          */
428         if (OBD_FAIL_CHECK(OBD_FAIL_OSC_BRW_READ_BULK)) {
429                 CERROR("obd_fail_loc=%x, skipping register_bulk\n",
430                        OBD_FAIL_OSC_BRW_READ_BULK);
431         } else {
432                 rc = ptlrpc_register_bulk(desc);
433                 if (rc)
434                         GOTO(out_unmap, rc);
435         }
436
437         request->rq_replen = lustre_msg_size(1, size);
438         rc = ptlrpc_queue_wait(request);
439         rc = ptlrpc_check_status(request, rc);
440
441         /*
442          * XXX: If there is an error during the processing of the callback,
443          *      such as a timeout in a sleep that it performs, brw_finish
444          *      will never get called, and we'll leak the desc, fail to kunmap
445          *      things, cats will live with dogs.  One solution would be to
446          *      export brw_finish as osc_brw_finish, so that the timeout case
447          *      and its kin could call it for proper cleanup.  An alternative
448          *      would be for an error return from the callback to cause us to
449          *      clean up, but that doesn't help the truly async cases (like
450          *      LOV), which will immediately return from their PHASE_START
451          *      callback, before any such cleanup-requiring error condition can
452          *      be detected.
453          */
454         if (rc)
455                 GOTO(out_req, rc);
456
457         /* Callbacks cause asynchronous handling. */
458         rc = callback(data, 0, CB_PHASE_START);
459
460 out_req:
461         ptlrpc_req_finished(request);
462         RETURN(rc);
463
464         /* Clean up on error. */
465 out_unmap:
466         while (mapped-- > 0)
467                 kunmap(pga[mapped].pg);
468         OBD_FREE(cb_data, sizeof(*cb_data));
469 out_desc:
470         ptlrpc_bulk_decref(desc);
471         goto out_req;
472 }
473
474 static int osc_brw_write(struct lustre_handle *conn, struct lov_stripe_md *md,
475                          obd_count page_count, struct brw_page *pga,
476                          brw_callback_t callback, struct io_cb_data *data)
477 {
478         struct ptlrpc_connection *connection =
479                 client_conn2cli(conn)->cl_import.imp_connection;
480         struct ptlrpc_request *request = NULL;
481         struct ptlrpc_bulk_desc *desc = NULL;
482         struct ost_body *body;
483         struct niobuf_local *local = NULL;
484         struct niobuf_remote *remote;
485         struct osc_brw_cb_data *cb_data = NULL;
486         int rc, j, size[3] = {sizeof(*body)};
487         void *iooptr, *nioptr;
488         int mapped = 0;
489         ENTRY;
490
491         size[1] = sizeof(struct obd_ioobj);
492         size[2] = page_count * sizeof(*remote);
493
494         request = ptlrpc_prep_req(class_conn2cliimp(conn), OST_WRITE, 3, size,
495                                   NULL);
496         if (!request)
497                 RETURN(-ENOMEM);
498
499         body = lustre_msg_buf(request->rq_reqmsg, 0);
500
501         desc = ptlrpc_prep_bulk(connection);
502         if (!desc)
503                 GOTO(out_req, rc = -ENOMEM);
504         desc->bd_portal = OSC_BULK_PORTAL;
505         desc->bd_cb = brw_finish;
506         OBD_ALLOC(cb_data, sizeof(*cb_data));
507         if (!cb_data)
508                 GOTO(out_desc, rc = -ENOMEM);
509
510         cb_data->callback = callback;
511         cb_data->cb_data = data;
512         data->desc = desc;
513         desc->bd_cb_data = cb_data;
514
515         iooptr = lustre_msg_buf(request->rq_reqmsg, 1);
516         nioptr = lustre_msg_buf(request->rq_reqmsg, 2);
517         ost_pack_ioo(&iooptr, md, page_count);
518         /* end almost identical to brw_read case */
519
520         OBD_ALLOC(local, page_count * sizeof(*local));
521         if (!local)
522                 GOTO(out_cb, rc = -ENOMEM);
523
524         cb_data->obd_data = local;
525         cb_data->obd_size = page_count * sizeof(*local);
526
527         for (mapped = 0; mapped < page_count; mapped++) {
528                 local[mapped].addr = kmap(pga[mapped].pg);
529
530                 CDEBUG(D_INFO, "kmap(pg) = %p ; pg->flags = %lx ; pg->count = "
531                        "%d ; page %d of %d\n",
532                        local[mapped].addr, pga[mapped].pg->flags,
533                        page_count(pga[mapped].pg),
534                        mapped, page_count - 1);
535
536                 local[mapped].offset = pga[mapped].off;
537                 local[mapped].len = pga[mapped].count;
538                 ost_pack_niobuf(&nioptr, pga[mapped].off, pga[mapped].count,
539                                 pga[mapped].flag, 0);
540         }
541
542         size[1] = page_count * sizeof(*remote);
543         request->rq_replen = lustre_msg_size(2, size);
544         rc = ptlrpc_queue_wait(request);
545         rc = ptlrpc_check_status(request, rc);
546         if (rc)
547                 GOTO(out_unmap, rc);
548
549         nioptr = lustre_msg_buf(request->rq_repmsg, 1);
550         if (!nioptr)
551                 GOTO(out_unmap, rc = -EINVAL);
552
553         if (request->rq_repmsg->buflens[1] != size[1]) {
554                 CERROR("buffer length wrong (%d vs. %d)\n",
555                        request->rq_repmsg->buflens[1], size[1]);
556                 GOTO(out_unmap, rc = -EINVAL);
557         }
558
559         for (j = 0; j < page_count; j++) {
560                 struct ptlrpc_bulk_page *bulk;
561
562                 ost_unpack_niobuf(&nioptr, &remote);
563
564                 bulk = ptlrpc_prep_bulk_page(desc);
565                 if (!bulk)
566                         GOTO(out_unmap, rc = -ENOMEM);
567
568                 bulk->bp_buf = (void *)(unsigned long)local[j].addr;
569                 bulk->bp_buflen = local[j].len;
570                 bulk->bp_xid = remote->xid;
571                 bulk->bp_page = pga[j].pg;
572         }
573
574         if (desc->bd_page_count != page_count)
575                 LBUG();
576
577         if (OBD_FAIL_CHECK(OBD_FAIL_OSC_BRW_WRITE_BULK))
578                 GOTO(out_unmap, rc = 0);
579
580         /* Our reference is released when brw_finish is complete. */
581         rc = ptlrpc_send_bulk(desc);
582
583         /* XXX: Mike, same question as in osc_brw_read. */
584         if (rc)
585                 GOTO(out_req, rc);
586
587         /* Callbacks cause asynchronous handling. */
588         rc = callback(data, 0, CB_PHASE_START);
589
590 out_req:
591         ptlrpc_req_finished(request);
592         RETURN(rc);
593
594         /* Clean up on error. */
595 out_unmap:
596         while (mapped-- > 0)
597                 kunmap(pga[mapped].pg);
598
599         OBD_FREE(local, page_count * sizeof(*local));
600 out_cb:
601         OBD_FREE(cb_data, sizeof(*cb_data));
602 out_desc:
603         ptlrpc_bulk_decref(desc);
604         goto out_req;
605 }
606
607 static int osc_brw(int cmd, struct lustre_handle *conn,
608                    struct lov_stripe_md *md, obd_count page_count,
609                    struct brw_page *pga, brw_callback_t callback,
610                    struct io_cb_data *data)
611 {
612         if (cmd & OBD_BRW_WRITE)
613                 return osc_brw_write(conn, md, page_count, pga, callback, data);
614         else
615                 return osc_brw_read(conn, md, page_count, pga, callback, data);
616 }
617
618 static int osc_enqueue(struct lustre_handle *connh, struct lov_stripe_md *lsm,
619                        struct lustre_handle *parent_lock,
620                        __u32 type, void *extentp, int extent_len, __u32 mode,
621                        int *flags, void *callback, void *data, int datalen,
622                        struct lustre_handle *lockh)
623 {
624         __u64 res_id[RES_NAME_SIZE] = { lsm->lsm_object_id };
625         struct obd_device *obddev = class_conn2obd(connh);
626         struct ldlm_extent *extent = extentp;
627         int rc;
628         ENTRY;
629
630         /* Filesystem locks are given a bit of special treatment: if
631          * this is not a file size lock (which has end == -1), we
632          * fixup the lock to start and end on page boundaries. */
633         if (extent->end != OBD_OBJECT_EOF) {
634                 extent->start &= PAGE_MASK;
635                 extent->end = (extent->end + PAGE_SIZE - 1) & PAGE_MASK;
636         }
637
638         /* Next, search for already existing extent locks that will cover us */
639         rc = ldlm_lock_match(obddev->obd_namespace, res_id, type, extent,
640                              sizeof(extent), mode, lockh);
641         if (rc == 1)
642                 RETURN(0); /* We already have a lock, and it's referenced */
643
644         /* If we're trying to read, we also search for an existing PW lock.  The
645          * VFS and page cache already protect us locally, so lots of readers/
646          * writers can share a single PW lock.
647          *
648          * There are problems with conversion deadlocks, so instead of
649          * converting a read lock to a write lock, we'll just enqueue a new
650          * one.
651          *
652          * At some point we should cancel the read lock instead of making them
653          * send us a blocking callback, but there are problems with canceling
654          * locks out from other users right now, too. */
655
656         if (mode == LCK_PR) {
657                 rc = ldlm_lock_match(obddev->obd_namespace, res_id, type,
658                                      extent, sizeof(extent), LCK_PW, lockh);
659                 if (rc == 1) {
660                         /* FIXME: This is not incredibly elegant, but it might
661                          * be more elegant than adding another parameter to
662                          * lock_match.  I want a second opinion. */
663                         ldlm_lock_addref(lockh, LCK_PR);
664                         ldlm_lock_decref(lockh, LCK_PW);
665
666                         RETURN(0);
667                 }
668         }
669
670         rc = ldlm_cli_enqueue(connh, NULL, obddev->obd_namespace, parent_lock,
671                               res_id, type, extent, sizeof(extent), mode, flags,
672                               ldlm_completion_ast, callback, data, datalen,
673                               lockh);
674         RETURN(rc);
675 }
676
677 static int osc_cancel(struct lustre_handle *oconn, struct lov_stripe_md *md,
678                       __u32 mode, struct lustre_handle *lockh)
679 {
680         ENTRY;
681
682         ldlm_lock_decref(lockh, mode);
683
684         RETURN(0);
685 }
686
687 static int osc_cancel_unused(struct lustre_handle *connh,
688                              struct lov_stripe_md *lsm, int local)
689 {
690         struct obd_device *obddev = class_conn2obd(connh);
691         __u64 res_id[RES_NAME_SIZE] = { lsm->lsm_object_id };
692
693         return ldlm_cli_cancel_unused(obddev->obd_namespace, res_id, local);
694 }
695
696 static int osc_statfs(struct lustre_handle *conn, struct obd_statfs *osfs)
697 {
698         struct ptlrpc_request *request;
699         int rc, size = sizeof(*osfs);
700         ENTRY;
701
702         request = ptlrpc_prep_req(class_conn2cliimp(conn), OST_STATFS, 0, NULL,
703                                   NULL);
704         if (!request)
705                 RETURN(-ENOMEM);
706
707         request->rq_replen = lustre_msg_size(1, &size);
708
709         rc = ptlrpc_queue_wait(request);
710         rc = ptlrpc_check_status(request, rc);
711         if (rc) {
712                 CERROR("%s failed: rc = %d\n", __FUNCTION__, rc);
713                 GOTO(out, rc);
714         }
715
716         obd_statfs_unpack(osfs, lustre_msg_buf(request->rq_repmsg, 0));
717
718         EXIT;
719  out:
720         ptlrpc_req_finished(request);
721         return rc;
722 }
723
724 static int osc_iocontrol(long cmd, struct lustre_handle *conn, int len,
725                          void *karg, void *uarg)
726 {
727         struct obd_device *obddev = class_conn2obd(conn);
728         struct obd_ioctl_data *data = karg;
729         int err = 0;
730         ENTRY;
731
732         if (_IOC_TYPE(cmd) != IOC_LDLM_TYPE ||
733             _IOC_NR(cmd) < IOC_LDLM_MIN_NR || _IOC_NR(cmd) > IOC_LDLM_MAX_NR) {
734                 CDEBUG(D_IOCTL, "invalid ioctl (type %ld, nr %ld, size %ld)\n",
735                        _IOC_TYPE(cmd), _IOC_NR(cmd), _IOC_SIZE(cmd));
736                 RETURN(-EINVAL);
737         }
738
739         switch (cmd) {
740         case IOC_LDLM_TEST: {
741                 err = ldlm_test(obddev, conn);
742                 CERROR("-- done err %d\n", err);
743                 GOTO(out, err);
744         }
745         case IOC_LDLM_REGRESS_START: {
746                 unsigned int numthreads = 1;
747                 unsigned int numheld = 10;
748                 unsigned int numres = 10;
749                 unsigned int numext = 10;
750                 char *parse;
751
752                 if (data->ioc_inllen1) {
753                         parse = data->ioc_inlbuf1;
754                         if (*parse != '\0') {
755                                 while(isspace(*parse)) parse++;
756                                 numthreads = simple_strtoul(parse, &parse, 0);
757                                 while(isspace(*parse)) parse++;
758                         }
759                         if (*parse != '\0') {
760                                 while(isspace(*parse)) parse++;
761                                 numheld = simple_strtoul(parse, &parse, 0);
762                                 while(isspace(*parse)) parse++;
763                         }
764                         if (*parse != '\0') {
765                                 while(isspace(*parse)) parse++;
766                                 numres = simple_strtoul(parse, &parse, 0);
767                                 while(isspace(*parse)) parse++;
768                         }
769                         if (*parse != '\0') {
770                                 while(isspace(*parse)) parse++;
771                                 numext = simple_strtoul(parse, &parse, 0);
772                                 while(isspace(*parse)) parse++;
773                         }
774                 }
775
776                 err = ldlm_regression_start(obddev, conn, numthreads,
777                                 numheld, numres, numext);
778
779                 CERROR("-- done err %d\n", err);
780                 GOTO(out, err);
781         }
782         case IOC_LDLM_REGRESS_STOP: {
783                 err = ldlm_regression_stop();
784                 CERROR("-- done err %d\n", err);
785                 GOTO(out, err);
786         }
787         default:
788                 GOTO(out, err = -EINVAL);
789         }
790 out:
791         return err;
792 }
793
794 struct obd_ops osc_obd_ops = {
795         o_setup:        client_obd_setup,
796         o_cleanup:      client_obd_cleanup,
797         o_statfs:       osc_statfs,
798         o_create:       osc_create,
799         o_destroy:      osc_destroy,
800         o_getattr:      osc_getattr,
801         o_setattr:      osc_setattr,
802         o_open:         osc_open,
803         o_close:        osc_close,
804         o_connect:      client_obd_connect,
805         o_disconnect:   client_obd_disconnect,
806         o_brw:          osc_brw,
807         o_punch:        osc_punch,
808         o_enqueue:      osc_enqueue,
809         o_cancel:       osc_cancel,
810         o_cancel_unused: osc_cancel_unused,
811         o_iocontrol:    osc_iocontrol
812 };
813
814 static int __init osc_init(void)
815 {
816         return class_register_type(&osc_obd_ops, LUSTRE_OSC_NAME);
817 }
818
819 static void __exit osc_exit(void)
820 {
821         class_unregister_type(LUSTRE_OSC_NAME);
822 }
823
824 MODULE_AUTHOR("Cluster File Systems, Inc. <info@clusterfs.com>");
825 MODULE_DESCRIPTION("Lustre Object Storage Client (OSC) v1.0");
826 MODULE_LICENSE("GPL");
827
828 module_init(osc_init);
829 module_exit(osc_exit);