Whamcloud - gitweb
b=201
[fs/lustre-release.git] / lustre / osc / osc_request.c
1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2  * vim:expandtab:shiftwidth=8:tabstop=8:
3  *
4  * Copyright (C) 2001, 2002 Cluster File Systems, Inc.
5  *
6  *  This code is issued under the GNU General Public License.
7  *  See the file COPYING in this distribution
8  *
9  *  Author Peter Braam <braam@clusterfs.com>
10  *
11  *  This server is single threaded at present (but can easily be multi
12  *  threaded). For testing and management it is treated as an
13  *  obd_device, although it does not export a full OBD method table
14  *  (the requests are coming in over the wire, so object target
15  *  modules do not have a full method table.)
16  *
17  */
18
19 #define EXPORT_SYMTAB
20 #define DEBUG_SUBSYSTEM S_OSC
21
22 #include <linux/version.h>
23 #include <linux/module.h>
24 #include <linux/mm.h>
25 #include <linux/highmem.h>
26 #include <linux/lustre_dlm.h>
27 #if (LINUX_VERSION_CODE >= KERNEL_VERSION(2,5,0))
28 #include <linux/workqueue.h>
29 #endif
30 #include <linux/kp30.h>
31 #include <linux/lustre_mds.h> /* for mds_objid */
32 #include <linux/obd_ost.h>
33 #include <linux/obd_lov.h>
34 #include <linux/ctype.h>
35 #include <linux/init.h>
36 #include <linux/lustre_ha.h>
37 #include <linux/obd_support.h> /* for OBD_FAIL_CHECK */
38 #include <linux/lustre_lite.h> /* for ll_i2info */
39 #include <portals/lib-types.h> /* for PTL_MD_MAX_IOV */
40
41 static int osc_getattr(struct lustre_handle *conn, struct obdo *oa,
42                        struct lov_stripe_md *md)
43 {
44         struct ptlrpc_request *request;
45         struct ost_body *body;
46         int rc, size = sizeof(*body);
47         ENTRY;
48
49         request = ptlrpc_prep_req(class_conn2cliimp(conn), OST_GETATTR, 1,
50                                   &size, NULL);
51         if (!request)
52                 RETURN(-ENOMEM);
53
54         body = lustre_msg_buf(request->rq_reqmsg, 0);
55 #warning FIXME: pack only valid fields instead of memcpy, endianness
56         memcpy(&body->oa, oa, sizeof(*oa));
57
58         request->rq_replen = lustre_msg_size(1, &size);
59
60         rc = ptlrpc_queue_wait(request);
61         rc = ptlrpc_check_status(request, rc);
62         if (rc) {
63                 CERROR("%s failed: rc = %d\n", __FUNCTION__, rc);
64                 GOTO(out, rc);
65         }
66
67         body = lustre_msg_buf(request->rq_repmsg, 0);
68         CDEBUG(D_INODE, "mode: %o\n", body->oa.o_mode);
69         if (oa)
70                 memcpy(oa, &body->oa, sizeof(*oa));
71
72         EXIT;
73  out:
74         ptlrpc_req_finished(request);
75         return rc;
76 }
77
78 static int osc_open(struct lustre_handle *conn, struct obdo *oa,
79                     struct lov_stripe_md *md)
80 {
81         struct ptlrpc_request *request;
82         struct ost_body *body;
83         int rc, size = sizeof(*body);
84         ENTRY;
85
86         request = ptlrpc_prep_req(class_conn2cliimp(conn), OST_OPEN, 1, &size,
87                                   NULL);
88         if (!request)
89                 RETURN(-ENOMEM);
90
91         body = lustre_msg_buf(request->rq_reqmsg, 0);
92 #warning FIXME: pack only valid fields instead of memcpy, endianness
93         memcpy(&body->oa, oa, sizeof(*oa));
94
95         request->rq_replen = lustre_msg_size(1, &size);
96
97         rc = ptlrpc_queue_wait(request);
98         rc = ptlrpc_check_status(request, rc);
99         if (rc)
100                 GOTO(out, rc);
101
102         body = lustre_msg_buf(request->rq_repmsg, 0);
103         CDEBUG(D_INODE, "mode: %o\n", body->oa.o_mode);
104         if (oa)
105                 memcpy(oa, &body->oa, sizeof(*oa));
106
107         EXIT;
108  out:
109         ptlrpc_req_finished(request);
110         return rc;
111 }
112
113 static int osc_close(struct lustre_handle *conn, struct obdo *oa,
114                      struct lov_stripe_md *md)
115 {
116         struct ptlrpc_request *request;
117         struct ost_body *body;
118         int rc, size = sizeof(*body);
119         ENTRY;
120
121         request = ptlrpc_prep_req(class_conn2cliimp(conn), OST_CLOSE, 1, &size,
122                                   NULL);
123         if (!request)
124                 RETURN(-ENOMEM);
125
126         body = lustre_msg_buf(request->rq_reqmsg, 0);
127 #warning FIXME: pack only valid fields instead of memcpy, endianness
128         memcpy(&body->oa, oa, sizeof(*oa));
129
130         request->rq_replen = lustre_msg_size(1, &size);
131
132         rc = ptlrpc_queue_wait(request);
133         rc = ptlrpc_check_status(request, rc);
134         if (rc)
135                 GOTO(out, rc);
136
137         body = lustre_msg_buf(request->rq_repmsg, 0);
138         CDEBUG(D_INODE, "mode: %o\n", body->oa.o_mode);
139         if (oa)
140                 memcpy(oa, &body->oa, sizeof(*oa));
141
142         EXIT;
143  out:
144         ptlrpc_req_finished(request);
145         return rc;
146 }
147
148 static int osc_setattr(struct lustre_handle *conn, struct obdo *oa,
149                        struct lov_stripe_md *md)
150 {
151         struct ptlrpc_request *request;
152         struct ost_body *body;
153         int rc, size = sizeof(*body);
154         ENTRY;
155
156         request = ptlrpc_prep_req(class_conn2cliimp(conn), OST_SETATTR, 1,
157                                   &size, NULL);
158         if (!request)
159                 RETURN(-ENOMEM);
160
161         body = lustre_msg_buf(request->rq_reqmsg, 0);
162         memcpy(&body->oa, oa, sizeof(*oa));
163
164         request->rq_replen = lustre_msg_size(1, &size);
165
166         rc = ptlrpc_queue_wait(request);
167         rc = ptlrpc_check_status(request, rc);
168
169         ptlrpc_req_finished(request);
170         return rc;
171 }
172
173 static int osc_create(struct lustre_handle *conn, struct obdo *oa,
174                       struct lov_stripe_md **ea)
175 {
176         struct ptlrpc_request *request;
177         struct ost_body *body;
178         struct lov_stripe_md *lsm;
179         int rc, size = sizeof(*body);
180         ENTRY;
181
182         LASSERT(oa);
183         LASSERT(ea);
184
185         lsm = *ea;
186         if (!lsm) {
187                 // XXX check oa->o_valid & OBD_MD_FLEASIZE first...
188                 OBD_ALLOC(lsm, oa->o_easize);
189                 if (!lsm)
190                         RETURN(-ENOMEM);
191                 lsm->lsm_mds_easize = oa->o_easize;
192         }
193
194         request = ptlrpc_prep_req(class_conn2cliimp(conn), OST_CREATE, 1, &size,
195                                   NULL);
196         if (!request)
197                 GOTO(out, rc = -ENOMEM);
198
199         body = lustre_msg_buf(request->rq_reqmsg, 0);
200         memcpy(&body->oa, oa, sizeof(*oa));
201
202         request->rq_replen = lustre_msg_size(1, &size);
203
204         rc = ptlrpc_queue_wait(request);
205         rc = ptlrpc_check_status(request, rc);
206         if (rc)
207                 GOTO(out_req, rc);
208
209         body = lustre_msg_buf(request->rq_repmsg, 0);
210         memcpy(oa, &body->oa, sizeof(*oa));
211
212         lsm->lsm_object_id = oa->o_id;
213         lsm->lsm_stripe_count = 0;
214         *ea = lsm;
215         EXIT;
216 out_req:
217         ptlrpc_req_finished(request);
218 out:
219         if (rc && !*ea)
220                 OBD_FREE(lsm, oa->o_easize);
221         return rc;
222 }
223
224 static int osc_punch(struct lustre_handle *conn, struct obdo *oa,
225                      struct lov_stripe_md *md, obd_size start,
226                      obd_size end)
227 {
228         struct ptlrpc_request *request;
229         struct ost_body *body;
230         int rc, size = sizeof(*body);
231         ENTRY;
232
233         if (!oa) {
234                 CERROR("oa NULL\n");
235                 RETURN(-EINVAL);
236         }
237
238         request = ptlrpc_prep_req(class_conn2cliimp(conn), OST_PUNCH, 1, &size,
239                                   NULL);
240         if (!request)
241                 RETURN(-ENOMEM);
242
243         body = lustre_msg_buf(request->rq_reqmsg, 0);
244 #warning FIXME: pack only valid fields instead of memcpy, endianness, valid
245         memcpy(&body->oa, oa, sizeof(*oa));
246
247         /* overload the size and blocks fields in the oa with start/end */
248         body->oa.o_size = HTON__u64(start);
249         body->oa.o_blocks = HTON__u64(end);
250         body->oa.o_valid |= HTON__u32(OBD_MD_FLSIZE | OBD_MD_FLBLOCKS);
251
252         request->rq_replen = lustre_msg_size(1, &size);
253
254         rc = ptlrpc_queue_wait(request);
255         rc = ptlrpc_check_status(request, rc);
256         if (rc)
257                 GOTO(out, rc);
258
259         body = lustre_msg_buf(request->rq_repmsg, 0);
260         memcpy(oa, &body->oa, sizeof(*oa));
261
262         EXIT;
263  out:
264         ptlrpc_req_finished(request);
265         return rc;
266 }
267
268 static int osc_destroy(struct lustre_handle *conn, struct obdo *oa,
269                        struct lov_stripe_md *ea)
270 {
271         struct ptlrpc_request *request;
272         struct ost_body *body;
273         int rc, size = sizeof(*body);
274         ENTRY;
275
276         if (!oa) {
277                 CERROR("oa NULL\n");
278                 RETURN(-EINVAL);
279         }
280         request = ptlrpc_prep_req(class_conn2cliimp(conn), OST_DESTROY, 1,
281                                   &size, NULL);
282         if (!request)
283                 RETURN(-ENOMEM);
284
285         body = lustre_msg_buf(request->rq_reqmsg, 0);
286 #warning FIXME: pack only valid fields instead of memcpy, endianness
287         memcpy(&body->oa, oa, sizeof(*oa));
288
289         request->rq_replen = lustre_msg_size(1, &size);
290
291         rc = ptlrpc_queue_wait(request);
292         rc = ptlrpc_check_status(request, rc);
293         if (rc)
294                 GOTO(out, rc);
295
296         body = lustre_msg_buf(request->rq_repmsg, 0);
297         memcpy(oa, &body->oa, sizeof(*oa));
298
299         EXIT;
300  out:
301         ptlrpc_req_finished(request);
302         return rc;
303 }
304
305 struct osc_brw_cb_data {
306         brw_callback_t callback;
307         void *cb_data;
308         void *obd_data;
309         size_t obd_size;
310 };
311
312 /* Our bulk-unmapping bottom half. */
313 static void unmap_and_decref_bulk_desc(void *data)
314 {
315         struct ptlrpc_bulk_desc *desc = data;
316         struct list_head *tmp;
317         ENTRY;
318
319         /* This feels wrong to me. */
320         list_for_each(tmp, &desc->bd_page_list) {
321                 struct ptlrpc_bulk_page *bulk;
322                 bulk = list_entry(tmp, struct ptlrpc_bulk_page, bp_link);
323
324                 kunmap(bulk->bp_page);
325         }
326
327         ptlrpc_bulk_decref(desc);
328         EXIT;
329 }
330
331 static void brw_finish(struct ptlrpc_bulk_desc *desc, void *data)
332 {
333         struct osc_brw_cb_data *cb_data = data;
334         int err = 0;
335         ENTRY;
336
337         if (desc->bd_flags & PTL_RPC_FL_TIMEOUT) {
338                 err = (desc->bd_flags & PTL_RPC_FL_INTR ? -ERESTARTSYS :
339                        -ETIMEDOUT);
340         }
341
342         if (cb_data->callback)
343                 cb_data->callback(cb_data->cb_data, err, CB_PHASE_FINISH);
344
345         if (cb_data->obd_data)
346                 OBD_FREE(cb_data->obd_data, cb_data->obd_size);
347         OBD_FREE(cb_data, sizeof(*cb_data));
348
349         /* We can't kunmap the desc from interrupt context, so we do it from
350          * the bottom half above. */
351         prepare_work(&desc->bd_queue, unmap_and_decref_bulk_desc, desc);
352         schedule_work(&desc->bd_queue);
353
354         EXIT;
355 }
356
357 static int osc_brw_read(struct lustre_handle *conn, struct lov_stripe_md *lsm,
358                         obd_count page_count, struct brw_page *pga,
359                         brw_callback_t callback, struct io_cb_data *data)
360 {
361         struct ptlrpc_connection *connection =
362                 client_conn2cli(conn)->cl_import.imp_connection;
363         struct ptlrpc_request *request = NULL;
364         struct ptlrpc_bulk_desc *desc = NULL;
365         struct ost_body *body;
366         struct osc_brw_cb_data *cb_data = NULL;
367         int rc, size[3] = {sizeof(*body)};
368         void *iooptr, *nioptr;
369         int mapped = 0;
370         __u32 xid;
371         ENTRY;
372
373         size[1] = sizeof(struct obd_ioobj);
374         size[2] = page_count * sizeof(struct niobuf_remote);
375
376         request = ptlrpc_prep_req(class_conn2cliimp(conn), OST_READ, 3, size,
377                                   NULL);
378         if (!request)
379                 RETURN(-ENOMEM);
380
381         body = lustre_msg_buf(request->rq_reqmsg, 0);
382
383         desc = ptlrpc_prep_bulk(connection);
384         if (!desc)
385                 GOTO(out_req, rc = -ENOMEM);
386         desc->bd_portal = OST_BULK_PORTAL;
387         desc->bd_cb = brw_finish;
388         OBD_ALLOC(cb_data, sizeof(*cb_data));
389         if (!cb_data)
390                 GOTO(out_desc, rc = -ENOMEM);
391
392         cb_data->callback = callback;
393         cb_data->cb_data = data;
394         CDEBUG(D_PAGE, "data(%p)->desc = %p\n", data, desc);
395         data->desc = desc;
396         desc->bd_cb_data = cb_data;
397
398         iooptr = lustre_msg_buf(request->rq_reqmsg, 1);
399         nioptr = lustre_msg_buf(request->rq_reqmsg, 2);
400         ost_pack_ioo(&iooptr, lsm, page_count);
401         /* end almost identical to brw_write case */
402
403         spin_lock(&connection->c_lock);
404         xid = ++connection->c_xid_out;       /* single xid for all pages */
405         spin_unlock(&connection->c_lock);
406
407         for (mapped = 0; mapped < page_count; mapped++) {
408                 struct ptlrpc_bulk_page *bulk = ptlrpc_prep_bulk_page(desc);
409                 if (bulk == NULL)
410                         GOTO(out_unmap, rc = -ENOMEM);
411
412                 bulk->bp_xid = xid;           /* single xid for all pages */
413
414                 bulk->bp_buf = kmap(pga[mapped].pg);
415                 bulk->bp_page = pga[mapped].pg;
416                 bulk->bp_buflen = PAGE_SIZE;
417                 ost_pack_niobuf(&nioptr, pga[mapped].off, pga[mapped].count,
418                                 pga[mapped].flag, bulk->bp_xid);
419         }
420
421         /*
422          * Register the bulk first, because the reply could arrive out of order,
423          * and we want to be ready for the bulk data.
424          *
425          * The reference is released when brw_finish is complete.
426          *
427          * On error, we never do the brw_finish, so we handle all decrefs.
428          */
429         if (OBD_FAIL_CHECK(OBD_FAIL_OSC_BRW_READ_BULK)) {
430                 CERROR("obd_fail_loc=%x, skipping register_bulk\n",
431                        OBD_FAIL_OSC_BRW_READ_BULK);
432         } else {
433                 rc = ptlrpc_register_bulk(desc);
434                 if (rc)
435                         GOTO(out_unmap, rc);
436         }
437
438         request->rq_replen = lustre_msg_size(1, size);
439         rc = ptlrpc_queue_wait(request);
440         rc = ptlrpc_check_status(request, rc);
441
442         /*
443          * XXX: If there is an error during the processing of the callback,
444          *      such as a timeout in a sleep that it performs, brw_finish
445          *      will never get called, and we'll leak the desc, fail to kunmap
446          *      things, cats will live with dogs.  One solution would be to
447          *      export brw_finish as osc_brw_finish, so that the timeout case
448          *      and its kin could call it for proper cleanup.  An alternative
449          *      would be for an error return from the callback to cause us to
450          *      clean up, but that doesn't help the truly async cases (like
451          *      LOV), which will immediately return from their PHASE_START
452          *      callback, before any such cleanup-requiring error condition can
453          *      be detected.
454          */
455         if (rc)
456                 GOTO(out_req, rc);
457
458         /* Callbacks cause asynchronous handling. */
459         rc = callback(data, 0, CB_PHASE_START);
460
461 out_req:
462         ptlrpc_req_finished(request);
463         RETURN(rc);
464
465         /* Clean up on error. */
466 out_unmap:
467         while (mapped-- > 0)
468                 kunmap(pga[mapped].pg);
469         OBD_FREE(cb_data, sizeof(*cb_data));
470 out_desc:
471         ptlrpc_bulk_decref(desc);
472         goto out_req;
473 }
474
475 static int osc_brw_write(struct lustre_handle *conn, struct lov_stripe_md *md,
476                          obd_count page_count, struct brw_page *pga,
477                          brw_callback_t callback, struct io_cb_data *data)
478 {
479         struct ptlrpc_connection *connection =
480                 client_conn2cli(conn)->cl_import.imp_connection;
481         struct ptlrpc_request *request = NULL;
482         struct ptlrpc_bulk_desc *desc = NULL;
483         struct ost_body *body;
484         struct niobuf_local *local = NULL;
485         struct niobuf_remote *remote;
486         struct osc_brw_cb_data *cb_data = NULL;
487         int rc, j, size[3] = {sizeof(*body)};
488         void *iooptr, *nioptr;
489         int mapped = 0;
490         ENTRY;
491
492         size[1] = sizeof(struct obd_ioobj);
493         size[2] = page_count * sizeof(*remote);
494
495         request = ptlrpc_prep_req(class_conn2cliimp(conn), OST_WRITE, 3, size,
496                                   NULL);
497         if (!request)
498                 RETURN(-ENOMEM);
499
500         body = lustre_msg_buf(request->rq_reqmsg, 0);
501
502         desc = ptlrpc_prep_bulk(connection);
503         if (!desc)
504                 GOTO(out_req, rc = -ENOMEM);
505         desc->bd_portal = OSC_BULK_PORTAL;
506         desc->bd_cb = brw_finish;
507         OBD_ALLOC(cb_data, sizeof(*cb_data));
508         if (!cb_data)
509                 GOTO(out_desc, rc = -ENOMEM);
510
511         cb_data->callback = callback;
512         cb_data->cb_data = data;
513         CDEBUG(D_PAGE, "data(%p)->desc = %p\n", data, desc);
514         data->desc = desc;
515         desc->bd_cb_data = cb_data;
516
517         iooptr = lustre_msg_buf(request->rq_reqmsg, 1);
518         nioptr = lustre_msg_buf(request->rq_reqmsg, 2);
519         ost_pack_ioo(&iooptr, md, page_count);
520         /* end almost identical to brw_read case */
521
522         OBD_ALLOC(local, page_count * sizeof(*local));
523         if (!local)
524                 GOTO(out_cb, rc = -ENOMEM);
525
526         cb_data->obd_data = local;
527         cb_data->obd_size = page_count * sizeof(*local);
528
529         for (mapped = 0; mapped < page_count; mapped++) {
530                 local[mapped].addr = kmap(pga[mapped].pg);
531
532                 CDEBUG(D_INFO, "kmap(pg) = %p ; pg->flags = %lx ; pg->count = "
533                        "%d ; page %d of %d\n",
534                        local[mapped].addr, pga[mapped].pg->flags,
535                        page_count(pga[mapped].pg),
536                        mapped, page_count - 1);
537
538                 local[mapped].offset = pga[mapped].off;
539                 local[mapped].len = pga[mapped].count;
540                 ost_pack_niobuf(&nioptr, pga[mapped].off, pga[mapped].count,
541                                 pga[mapped].flag, 0);
542         }
543
544         size[1] = page_count * sizeof(*remote);
545         request->rq_replen = lustre_msg_size(2, size);
546         rc = ptlrpc_queue_wait(request);
547         rc = ptlrpc_check_status(request, rc);
548         if (rc)
549                 GOTO(out_unmap, rc);
550
551         nioptr = lustre_msg_buf(request->rq_repmsg, 1);
552         if (!nioptr)
553                 GOTO(out_unmap, rc = -EINVAL);
554
555         if (request->rq_repmsg->buflens[1] != size[1]) {
556                 CERROR("buffer length wrong (%d vs. %d)\n",
557                        request->rq_repmsg->buflens[1], size[1]);
558                 GOTO(out_unmap, rc = -EINVAL);
559         }
560
561         for (j = 0; j < page_count; j++) {
562                 struct ptlrpc_bulk_page *bulk;
563
564                 ost_unpack_niobuf(&nioptr, &remote);
565
566                 bulk = ptlrpc_prep_bulk_page(desc);
567                 if (!bulk)
568                         GOTO(out_unmap, rc = -ENOMEM);
569
570                 bulk->bp_buf = (void *)(unsigned long)local[j].addr;
571                 bulk->bp_buflen = local[j].len;
572                 bulk->bp_xid = remote->xid;
573                 bulk->bp_page = pga[j].pg;
574         }
575
576         if (desc->bd_page_count != page_count)
577                 LBUG();
578
579         if (OBD_FAIL_CHECK(OBD_FAIL_OSC_BRW_WRITE_BULK))
580                 GOTO(out_unmap, rc = 0);
581
582         /* Our reference is released when brw_finish is complete. */
583         rc = ptlrpc_send_bulk(desc);
584
585         /* XXX: Mike, same question as in osc_brw_read. */
586         if (rc)
587                 GOTO(out_req, rc);
588
589         /* Callbacks cause asynchronous handling. */
590         rc = callback(data, 0, CB_PHASE_START);
591
592 out_req:
593         ptlrpc_req_finished(request);
594         RETURN(rc);
595
596         /* Clean up on error. */
597 out_unmap:
598         while (mapped-- > 0)
599                 kunmap(pga[mapped].pg);
600
601         OBD_FREE(local, page_count * sizeof(*local));
602 out_cb:
603         OBD_FREE(cb_data, sizeof(*cb_data));
604 out_desc:
605         ptlrpc_bulk_decref(desc);
606         goto out_req;
607 }
608
609 static int osc_brw(int cmd, struct lustre_handle *conn,
610                    struct lov_stripe_md *md, obd_count page_count,
611                    struct brw_page *pga, brw_callback_t callback,
612                    struct io_cb_data *data)
613 {
614         ENTRY;
615
616         while (page_count) {
617                 obd_count pages_per_brw;
618                 int rc;
619
620                 if (page_count > PTL_MD_MAX_IOV)
621                         pages_per_brw = PTL_MD_MAX_IOV;
622                 else
623                         pages_per_brw = page_count;
624
625                 if (cmd & OBD_BRW_WRITE)
626                         rc = osc_brw_write(conn, md, pages_per_brw, pga,
627                                            callback, data);
628                 else
629                         rc = osc_brw_read(conn, md, pages_per_brw, pga,
630                                           callback, data);
631
632                 if (rc != 0)
633                         RETURN(rc);
634
635                 page_count -= pages_per_brw;
636                 pga += pages_per_brw;
637         }
638         RETURN(0);
639 }
640
641 static int osc_enqueue(struct lustre_handle *connh, struct lov_stripe_md *lsm,
642                        struct lustre_handle *parent_lock,
643                        __u32 type, void *extentp, int extent_len, __u32 mode,
644                        int *flags, void *callback, void *data, int datalen,
645                        struct lustre_handle *lockh)
646 {
647         __u64 res_id[RES_NAME_SIZE] = { lsm->lsm_object_id };
648         struct obd_device *obddev = class_conn2obd(connh);
649         struct ldlm_extent *extent = extentp;
650         int rc;
651         ENTRY;
652
653         /* Filesystem locks are given a bit of special treatment: if
654          * this is not a file size lock (which has end == -1), we
655          * fixup the lock to start and end on page boundaries. */
656         if (extent->end != OBD_OBJECT_EOF) {
657                 extent->start &= PAGE_MASK;
658                 extent->end = (extent->end + PAGE_SIZE - 1) & PAGE_MASK;
659         }
660
661         /* Next, search for already existing extent locks that will cover us */
662         rc = ldlm_lock_match(obddev->obd_namespace, res_id, type, extent,
663                              sizeof(extent), mode, lockh);
664         if (rc == 1)
665                 /* We already have a lock, and it's referenced */
666                 RETURN(ELDLM_OK);
667
668         /* If we're trying to read, we also search for an existing PW lock.  The
669          * VFS and page cache already protect us locally, so lots of readers/
670          * writers can share a single PW lock.
671          *
672          * There are problems with conversion deadlocks, so instead of
673          * converting a read lock to a write lock, we'll just enqueue a new
674          * one.
675          *
676          * At some point we should cancel the read lock instead of making them
677          * send us a blocking callback, but there are problems with canceling
678          * locks out from other users right now, too. */
679
680         if (mode == LCK_PR) {
681                 rc = ldlm_lock_match(obddev->obd_namespace, res_id, type,
682                                      extent, sizeof(extent), LCK_PW, lockh);
683                 if (rc == 1) {
684                         /* FIXME: This is not incredibly elegant, but it might
685                          * be more elegant than adding another parameter to
686                          * lock_match.  I want a second opinion. */
687                         ldlm_lock_addref(lockh, LCK_PR);
688                         ldlm_lock_decref(lockh, LCK_PW);
689
690                         RETURN(ELDLM_OK);
691                 }
692         }
693
694         rc = ldlm_cli_enqueue(connh, NULL, obddev->obd_namespace, parent_lock,
695                               res_id, type, extent, sizeof(extent), mode, flags,
696                               ldlm_completion_ast, callback, data, datalen,
697                               lockh);
698         RETURN(rc);
699 }
700
701 static int osc_cancel(struct lustre_handle *oconn, struct lov_stripe_md *md,
702                       __u32 mode, struct lustre_handle *lockh)
703 {
704         ENTRY;
705
706         ldlm_lock_decref(lockh, mode);
707
708         RETURN(0);
709 }
710
711 static int osc_cancel_unused(struct lustre_handle *connh,
712                              struct lov_stripe_md *lsm, int flags)
713 {
714         struct obd_device *obddev = class_conn2obd(connh);
715         __u64 res_id[RES_NAME_SIZE] = { lsm->lsm_object_id };
716
717         return ldlm_cli_cancel_unused(obddev->obd_namespace, res_id, flags);
718 }
719
720 static int osc_statfs(struct lustre_handle *conn, struct obd_statfs *osfs)
721 {
722         struct ptlrpc_request *request;
723         int rc, size = sizeof(*osfs);
724         ENTRY;
725
726         request = ptlrpc_prep_req(class_conn2cliimp(conn), OST_STATFS, 0, NULL,
727                                   NULL);
728         if (!request)
729                 RETURN(-ENOMEM);
730
731         request->rq_replen = lustre_msg_size(1, &size);
732
733         rc = ptlrpc_queue_wait(request);
734         rc = ptlrpc_check_status(request, rc);
735         if (rc) {
736                 CERROR("%s failed: rc = %d\n", __FUNCTION__, rc);
737                 GOTO(out, rc);
738         }
739
740         obd_statfs_unpack(osfs, lustre_msg_buf(request->rq_repmsg, 0));
741
742         EXIT;
743  out:
744         ptlrpc_req_finished(request);
745         return rc;
746 }
747
748 static int osc_iocontrol(long cmd, struct lustre_handle *conn, int len,
749                          void *karg, void *uarg)
750 {
751         struct obd_device *obddev = class_conn2obd(conn);
752         struct obd_ioctl_data *data = karg;
753         int err = 0;
754         ENTRY;
755
756         switch (cmd) {
757         case IOC_LDLM_TEST: {
758                 err = ldlm_test(obddev, conn);
759                 CERROR("-- done err %d\n", err);
760                 GOTO(out, err);
761         }
762         case IOC_LDLM_REGRESS_START: {
763                 unsigned int numthreads = 1;
764                 unsigned int numheld = 10;
765                 unsigned int numres = 10;
766                 unsigned int numext = 10;
767                 char *parse;
768
769                 if (data->ioc_inllen1) {
770                         parse = data->ioc_inlbuf1;
771                         if (*parse != '\0') {
772                                 while(isspace(*parse)) parse++;
773                                 numthreads = simple_strtoul(parse, &parse, 0);
774                                 while(isspace(*parse)) parse++;
775                         }
776                         if (*parse != '\0') {
777                                 while(isspace(*parse)) parse++;
778                                 numheld = simple_strtoul(parse, &parse, 0);
779                                 while(isspace(*parse)) parse++;
780                         }
781                         if (*parse != '\0') {
782                                 while(isspace(*parse)) parse++;
783                                 numres = simple_strtoul(parse, &parse, 0);
784                                 while(isspace(*parse)) parse++;
785                         }
786                         if (*parse != '\0') {
787                                 while(isspace(*parse)) parse++;
788                                 numext = simple_strtoul(parse, &parse, 0);
789                                 while(isspace(*parse)) parse++;
790                         }
791                 }
792
793                 err = ldlm_regression_start(obddev, conn, numthreads,
794                                 numheld, numres, numext);
795
796                 CERROR("-- done err %d\n", err);
797                 GOTO(out, err);
798         }
799         case IOC_LDLM_REGRESS_STOP: {
800                 err = ldlm_regression_stop();
801                 CERROR("-- done err %d\n", err);
802                 GOTO(out, err);
803         }
804         case IOC_OSC_REGISTER_LOV: {
805                 if (obddev->u.cli.cl_containing_lov)
806                         GOTO(out, err = -EALREADY);
807                 obddev->u.cli.cl_containing_lov = (struct obd_device *)karg;
808                 GOTO(out, err);
809         }
810
811         default:
812                 GOTO(out, err = -ENOTTY);
813         }
814 out:
815         return err;
816 }
817
818 struct obd_ops osc_obd_ops = {
819         o_setup:        client_obd_setup,
820         o_cleanup:      client_obd_cleanup,
821         o_statfs:       osc_statfs,
822         o_create:       osc_create,
823         o_destroy:      osc_destroy,
824         o_getattr:      osc_getattr,
825         o_setattr:      osc_setattr,
826         o_open:         osc_open,
827         o_close:        osc_close,
828         o_connect:      client_obd_connect,
829         o_disconnect:   client_obd_disconnect,
830         o_brw:          osc_brw,
831         o_punch:        osc_punch,
832         o_enqueue:      osc_enqueue,
833         o_cancel:       osc_cancel,
834         o_cancel_unused: osc_cancel_unused,
835         o_iocontrol:    osc_iocontrol
836 };
837
838 static int __init osc_init(void)
839 {
840         return class_register_type(&osc_obd_ops, LUSTRE_OSC_NAME);
841 }
842
843 static void __exit osc_exit(void)
844 {
845         class_unregister_type(LUSTRE_OSC_NAME);
846 }
847
848 MODULE_AUTHOR("Cluster File Systems, Inc. <info@clusterfs.com>");
849 MODULE_DESCRIPTION("Lustre Object Storage Client (OSC) v1.0");
850 MODULE_LICENSE("GPL");
851
852 module_init(osc_init);
853 module_exit(osc_exit);