Whamcloud - gitweb
1. Bug fix for Bug #320, also needs repair of lconf, since it tries
[fs/lustre-release.git] / lustre / osc / osc_request.c
1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2  * vim:expandtab:shiftwidth=8:tabstop=8:
3  *
4  * Copyright (C) 2001, 2002 Cluster File Systems, Inc.
5  *
6  *  This code is issued under the GNU General Public License.
7  *  See the file COPYING in this distribution
8  *
9  *  Author Peter Braam <braam@clusterfs.com>
10  *
11  *  This server is single threaded at present (but can easily be multi
12  *  threaded). For testing and management it is treated as an
13  *  obd_device, although it does not export a full OBD method table
14  *  (the requests are coming in over the wire, so object target
15  *  modules do not have a full method table.)
16  *
17  */
18
19 #define EXPORT_SYMTAB
20 #define DEBUG_SUBSYSTEM S_OSC
21
22 #include <linux/version.h>
23 #include <linux/module.h>
24 #include <linux/mm.h>
25 #include <linux/highmem.h>
26 #include <linux/lustre_dlm.h>
27 #if (LINUX_VERSION_CODE >= KERNEL_VERSION(2,5,0))
28 #include <linux/workqueue.h>
29 #endif
30 #include <linux/kp30.h>
31 #include <linux/lustre_mds.h> /* for mds_objid */
32 #include <linux/obd_ost.h>
33 #include <linux/obd_lov.h>
34 #include <linux/ctype.h>
35 #include <linux/init.h>
36 #include <linux/lustre_ha.h>
37 #include <linux/obd_support.h> /* for OBD_FAIL_CHECK */
38 #include <linux/lustre_lite.h> /* for ll_i2info */
39 #include <portals/lib-types.h> /* for PTL_MD_MAX_IOV */
40 #include <linux/lprocfs_status.h>
41
42 extern struct lprocfs_vars status_var_nm_1[];
43 extern struct lprocfs_vars status_class_var[];
44
45 static int osc_getattr(struct lustre_handle *conn, struct obdo *oa,
46                        struct lov_stripe_md *md)
47 {
48         struct ptlrpc_request *request;
49         struct ost_body *body;
50         int rc, size = sizeof(*body);
51         ENTRY;
52
53         request = ptlrpc_prep_req(class_conn2cliimp(conn), OST_GETATTR, 1,
54                                   &size, NULL);
55         if (!request)
56                 RETURN(-ENOMEM);
57
58         body = lustre_msg_buf(request->rq_reqmsg, 0);
59 #warning FIXME: pack only valid fields instead of memcpy, endianness
60         memcpy(&body->oa, oa, sizeof(*oa));
61
62         request->rq_replen = lustre_msg_size(1, &size);
63
64         rc = ptlrpc_queue_wait(request);
65         rc = ptlrpc_check_status(request, rc);
66         if (rc) {
67                 CERROR("%s failed: rc = %d\n", __FUNCTION__, rc);
68                 GOTO(out, rc);
69         }
70
71         body = lustre_msg_buf(request->rq_repmsg, 0);
72         CDEBUG(D_INODE, "mode: %o\n", body->oa.o_mode);
73         if (oa)
74                 memcpy(oa, &body->oa, sizeof(*oa));
75
76         EXIT;
77  out:
78         ptlrpc_req_finished(request);
79         return rc;
80 }
81
82 static int osc_open(struct lustre_handle *conn, struct obdo *oa,
83                     struct lov_stripe_md *md)
84 {
85         struct ptlrpc_request *request;
86         struct ost_body *body;
87         int rc, size = sizeof(*body);
88         ENTRY;
89
90         request = ptlrpc_prep_req(class_conn2cliimp(conn), OST_OPEN, 1, &size,
91                                   NULL);
92         if (!request)
93                 RETURN(-ENOMEM);
94
95         body = lustre_msg_buf(request->rq_reqmsg, 0);
96 #warning FIXME: pack only valid fields instead of memcpy, endianness
97         memcpy(&body->oa, oa, sizeof(*oa));
98
99         request->rq_replen = lustre_msg_size(1, &size);
100
101         rc = ptlrpc_queue_wait(request);
102         rc = ptlrpc_check_status(request, rc);
103         if (rc)
104                 GOTO(out, rc);
105
106         body = lustre_msg_buf(request->rq_repmsg, 0);
107         CDEBUG(D_INODE, "mode: %o\n", body->oa.o_mode);
108         if (oa)
109                 memcpy(oa, &body->oa, sizeof(*oa));
110
111         EXIT;
112  out:
113         ptlrpc_req_finished(request);
114         return rc;
115 }
116
117 static int osc_close(struct lustre_handle *conn, struct obdo *oa,
118                      struct lov_stripe_md *md)
119 {
120         struct ptlrpc_request *request;
121         struct ost_body *body;
122         int rc, size = sizeof(*body);
123         ENTRY;
124
125         request = ptlrpc_prep_req(class_conn2cliimp(conn), OST_CLOSE, 1, &size,
126                                   NULL);
127         if (!request)
128                 RETURN(-ENOMEM);
129
130         body = lustre_msg_buf(request->rq_reqmsg, 0);
131 #warning FIXME: pack only valid fields instead of memcpy, endianness
132         memcpy(&body->oa, oa, sizeof(*oa));
133
134         request->rq_replen = lustre_msg_size(1, &size);
135
136         rc = ptlrpc_queue_wait(request);
137         rc = ptlrpc_check_status(request, rc);
138         if (rc)
139                 GOTO(out, rc);
140
141         body = lustre_msg_buf(request->rq_repmsg, 0);
142         CDEBUG(D_INODE, "mode: %o\n", body->oa.o_mode);
143         if (oa)
144                 memcpy(oa, &body->oa, sizeof(*oa));
145
146         EXIT;
147  out:
148         ptlrpc_req_finished(request);
149         return rc;
150 }
151
152 static int osc_setattr(struct lustre_handle *conn, struct obdo *oa,
153                        struct lov_stripe_md *md)
154 {
155         struct ptlrpc_request *request;
156         struct ost_body *body;
157         int rc, size = sizeof(*body);
158         ENTRY;
159
160         request = ptlrpc_prep_req(class_conn2cliimp(conn), OST_SETATTR, 1,
161                                   &size, NULL);
162         if (!request)
163                 RETURN(-ENOMEM);
164
165         body = lustre_msg_buf(request->rq_reqmsg, 0);
166         memcpy(&body->oa, oa, sizeof(*oa));
167
168         request->rq_replen = lustre_msg_size(1, &size);
169
170         rc = ptlrpc_queue_wait(request);
171         rc = ptlrpc_check_status(request, rc);
172
173         ptlrpc_req_finished(request);
174         return rc;
175 }
176
177 static int osc_create(struct lustre_handle *conn, struct obdo *oa,
178                       struct lov_stripe_md **ea)
179 {
180         struct ptlrpc_request *request;
181         struct ost_body *body;
182         struct lov_stripe_md *lsm;
183         int rc, size = sizeof(*body);
184         ENTRY;
185
186         LASSERT(oa);
187         LASSERT(ea);
188
189         lsm = *ea;
190         if (!lsm) {
191                 // XXX check oa->o_valid & OBD_MD_FLEASIZE first...
192                 OBD_ALLOC(lsm, oa->o_easize);
193                 if (!lsm)
194                         RETURN(-ENOMEM);
195                 lsm->lsm_mds_easize = oa->o_easize;
196         }
197
198         request = ptlrpc_prep_req(class_conn2cliimp(conn), OST_CREATE, 1, &size,
199                                   NULL);
200         if (!request)
201                 GOTO(out, rc = -ENOMEM);
202
203         body = lustre_msg_buf(request->rq_reqmsg, 0);
204         memcpy(&body->oa, oa, sizeof(*oa));
205
206         request->rq_replen = lustre_msg_size(1, &size);
207
208         rc = ptlrpc_queue_wait(request);
209         rc = ptlrpc_check_status(request, rc);
210         if (rc)
211                 GOTO(out_req, rc);
212
213         body = lustre_msg_buf(request->rq_repmsg, 0);
214         memcpy(oa, &body->oa, sizeof(*oa));
215
216         lsm->lsm_object_id = oa->o_id;
217         lsm->lsm_stripe_count = 0;
218         *ea = lsm;
219         EXIT;
220 out_req:
221         ptlrpc_req_finished(request);
222 out:
223         if (rc && !*ea)
224                 OBD_FREE(lsm, oa->o_easize);
225         return rc;
226 }
227
228 static int osc_punch(struct lustre_handle *conn, struct obdo *oa,
229                      struct lov_stripe_md *md, obd_size start,
230                      obd_size end)
231 {
232         struct ptlrpc_request *request;
233         struct ost_body *body;
234         int rc, size = sizeof(*body);
235         ENTRY;
236
237         if (!oa) {
238                 CERROR("oa NULL\n");
239                 RETURN(-EINVAL);
240         }
241
242         request = ptlrpc_prep_req(class_conn2cliimp(conn), OST_PUNCH, 1, &size,
243                                   NULL);
244         if (!request)
245                 RETURN(-ENOMEM);
246
247         body = lustre_msg_buf(request->rq_reqmsg, 0);
248 #warning FIXME: pack only valid fields instead of memcpy, endianness, valid
249         memcpy(&body->oa, oa, sizeof(*oa));
250
251         /* overload the size and blocks fields in the oa with start/end */
252         body->oa.o_size = HTON__u64(start);
253         body->oa.o_blocks = HTON__u64(end);
254         body->oa.o_valid |= HTON__u32(OBD_MD_FLSIZE | OBD_MD_FLBLOCKS);
255
256         request->rq_replen = lustre_msg_size(1, &size);
257
258         rc = ptlrpc_queue_wait(request);
259         rc = ptlrpc_check_status(request, rc);
260         if (rc)
261                 GOTO(out, rc);
262
263         body = lustre_msg_buf(request->rq_repmsg, 0);
264         memcpy(oa, &body->oa, sizeof(*oa));
265
266         EXIT;
267  out:
268         ptlrpc_req_finished(request);
269         return rc;
270 }
271
272 static int osc_destroy(struct lustre_handle *conn, struct obdo *oa,
273                        struct lov_stripe_md *ea)
274 {
275         struct ptlrpc_request *request;
276         struct ost_body *body;
277         int rc, size = sizeof(*body);
278         ENTRY;
279
280         if (!oa) {
281                 CERROR("oa NULL\n");
282                 RETURN(-EINVAL);
283         }
284         request = ptlrpc_prep_req(class_conn2cliimp(conn), OST_DESTROY, 1,
285                                   &size, NULL);
286         if (!request)
287                 RETURN(-ENOMEM);
288
289         body = lustre_msg_buf(request->rq_reqmsg, 0);
290 #warning FIXME: pack only valid fields instead of memcpy, endianness
291         memcpy(&body->oa, oa, sizeof(*oa));
292
293         request->rq_replen = lustre_msg_size(1, &size);
294
295         rc = ptlrpc_queue_wait(request);
296         rc = ptlrpc_check_status(request, rc);
297         if (rc)
298                 GOTO(out, rc);
299
300         body = lustre_msg_buf(request->rq_repmsg, 0);
301         memcpy(oa, &body->oa, sizeof(*oa));
302
303         EXIT;
304  out:
305         ptlrpc_req_finished(request);
306         return rc;
307 }
308
309 struct osc_brw_cb_data {
310         brw_cb_t callback;
311         void *cb_data;
312         void *obd_data;
313         size_t obd_size;
314 };
315
316 /* Our bulk-unmapping bottom half. */
317 static void unmap_and_decref_bulk_desc(void *data)
318 {
319         struct ptlrpc_bulk_desc *desc = data;
320         struct list_head *tmp;
321         ENTRY;
322
323         /* This feels wrong to me. */
324         list_for_each(tmp, &desc->bd_page_list) {
325                 struct ptlrpc_bulk_page *bulk;
326                 bulk = list_entry(tmp, struct ptlrpc_bulk_page, bp_link);
327
328                 kunmap(bulk->bp_page);
329                 obd_kmap_put(1);
330         }
331
332         ptlrpc_bulk_decref(desc);
333         EXIT;
334 }
335
336 /*  this is the callback function which is invoked by the Portals
337  *  event handler associated with the bulk_sink queue and bulk_source queue. 
338  */
339
340 static void osc_ptl_ev_hdlr(struct ptlrpc_bulk_desc *desc, void *data)
341 {
342         struct osc_brw_cb_data *cb_data = data;
343         int err = 0;
344         ENTRY;
345
346         if (desc->bd_flags & PTL_RPC_FL_TIMEOUT) {
347                 err = (desc->bd_flags & PTL_RPC_FL_INTR ? -ERESTARTSYS :
348                        -ETIMEDOUT);
349         }
350
351         if (cb_data->callback)
352                 cb_data->callback(cb_data->cb_data, err, CB_PHASE_FINISH);
353
354         if (cb_data->obd_data)
355                 OBD_FREE(cb_data->obd_data, cb_data->obd_size);
356         OBD_FREE(cb_data, sizeof(*cb_data));
357
358         /* We can't kunmap the desc from interrupt context, so we do it from
359          * the bottom half above. */
360         prepare_work(&desc->bd_queue, unmap_and_decref_bulk_desc, desc);
361         schedule_work(&desc->bd_queue);
362
363         EXIT;
364 }
365
366 static int osc_brw_read(struct lustre_handle *conn, struct lov_stripe_md *lsm,
367                         obd_count page_count, struct brw_page *pga,
368                         brw_cb_t callback, struct brw_cb_data *data)
369 {
370         struct ptlrpc_connection *connection =
371                 client_conn2cli(conn)->cl_import.imp_connection;
372         struct ptlrpc_request *request = NULL;
373         struct ptlrpc_bulk_desc *desc = NULL;
374         struct ost_body *body;
375         struct osc_brw_cb_data *cb_data = NULL;
376         int rc, size[3] = {sizeof(*body)};
377         void *iooptr, *nioptr;
378         int mapped = 0;
379         __u32 xid;
380         ENTRY;
381
382         size[1] = sizeof(struct obd_ioobj);
383         size[2] = page_count * sizeof(struct niobuf_remote);
384
385         request = ptlrpc_prep_req(class_conn2cliimp(conn), OST_READ, 3, size,
386                                   NULL);
387         if (!request)
388                 RETURN(-ENOMEM);
389
390         body = lustre_msg_buf(request->rq_reqmsg, 0);
391
392         desc = ptlrpc_prep_bulk(connection);
393         if (!desc)
394                 GOTO(out_req, rc = -ENOMEM);
395         desc->bd_portal = OST_BULK_PORTAL;
396         desc->bd_ptl_ev_hdlr = osc_ptl_ev_hdlr;
397         OBD_ALLOC(cb_data, sizeof(*cb_data));
398         if (!cb_data)
399                 GOTO(out_desc, rc = -ENOMEM);
400
401         cb_data->callback = callback;
402         cb_data->cb_data = data;
403         CDEBUG(D_PAGE, "data(%p)->desc = %p\n", data, desc);
404         data->brw_desc = desc;
405         desc->bd_ptl_ev_data = cb_data;
406
407         iooptr = lustre_msg_buf(request->rq_reqmsg, 1);
408         nioptr = lustre_msg_buf(request->rq_reqmsg, 2);
409         ost_pack_ioo(&iooptr, lsm, page_count);
410         /* end almost identical to brw_write case */
411
412         spin_lock(&connection->c_lock);
413         xid = ++connection->c_xid_out;       /* single xid for all pages */
414         spin_unlock(&connection->c_lock);
415
416         obd_kmap_get(page_count, 0);
417
418         for (mapped = 0; mapped < page_count; mapped++) {
419                 struct ptlrpc_bulk_page *bulk = ptlrpc_prep_bulk_page(desc);
420                 if (bulk == NULL)
421                         GOTO(out_unmap, rc = -ENOMEM);
422
423                 bulk->bp_xid = xid;           /* single xid for all pages */
424
425                 bulk->bp_buf = kmap(pga[mapped].pg);
426                 bulk->bp_page = pga[mapped].pg;
427                 bulk->bp_buflen = PAGE_SIZE;
428                 ost_pack_niobuf(&nioptr, pga[mapped].off, pga[mapped].count,
429                                 pga[mapped].flag, bulk->bp_xid);
430         }
431
432         /*
433          * Register the bulk first, because the reply could arrive out of order,
434          * and we want to be ready for the bulk data.
435          *
436          * The reference is released when brw_finish is complete.
437          *
438          * On error, we never do the brw_finish, so we handle all decrefs.
439          */
440         if (OBD_FAIL_CHECK(OBD_FAIL_OSC_BRW_READ_BULK)) {
441                 CERROR("obd_fail_loc=%x, skipping register_bulk\n",
442                        OBD_FAIL_OSC_BRW_READ_BULK);
443         } else {
444                 rc = ptlrpc_register_bulk(desc);
445                 if (rc)
446                         GOTO(out_unmap, rc);
447         }
448
449         request->rq_replen = lustre_msg_size(1, size);
450         rc = ptlrpc_queue_wait(request);
451         rc = ptlrpc_check_status(request, rc);
452
453         /*
454          * XXX: If there is an error during the processing of the callback,
455          *      such as a timeout in a sleep that it performs, brw_finish
456          *      will never get called, and we'll leak the desc, fail to kunmap
457          *      things, cats will live with dogs.  One solution would be to
458          *      export brw_finish as osc_brw_finish, so that the timeout case
459          *      and its kin could call it for proper cleanup.  An alternative
460          *      would be for an error return from the callback to cause us to
461          *      clean up, but that doesn't help the truly async cases (like
462          *      LOV), which will immediately return from their PHASE_START
463          *      callback, before any such cleanup-requiring error condition can
464          *      be detected.
465          */
466         if (rc)
467                 GOTO(out_req, rc);
468
469         /* Callbacks cause asynchronous handling. */
470         rc = callback(data, 0, CB_PHASE_START);
471
472 out_req:
473         ptlrpc_req_finished(request);
474         RETURN(rc);
475
476         /* Clean up on error. */
477 out_unmap:
478         while (mapped-- > 0)
479                 kunmap(pga[mapped].pg);
480         obd_kmap_put(page_count);
481         OBD_FREE(cb_data, sizeof(*cb_data));
482 out_desc:
483         ptlrpc_bulk_decref(desc);
484         goto out_req;
485 }
486
487 static int osc_brw_write(struct lustre_handle *conn, struct lov_stripe_md *md,
488                          obd_count page_count, struct brw_page *pga,
489                          brw_cb_t callback, struct brw_cb_data *data)
490 {
491         struct ptlrpc_connection *connection =
492                 client_conn2cli(conn)->cl_import.imp_connection;
493         struct ptlrpc_request *request = NULL;
494         struct ptlrpc_bulk_desc *desc = NULL;
495         struct ost_body *body;
496         struct niobuf_local *local = NULL;
497         struct niobuf_remote *remote;
498         struct osc_brw_cb_data *cb_data = NULL;
499         int rc, j, size[3] = {sizeof(*body)};
500         void *iooptr, *nioptr;
501         int mapped = 0;
502         ENTRY;
503
504         size[1] = sizeof(struct obd_ioobj);
505         size[2] = page_count * sizeof(*remote);
506
507         request = ptlrpc_prep_req(class_conn2cliimp(conn), OST_WRITE, 3, size,
508                                   NULL);
509         if (!request)
510                 RETURN(-ENOMEM);
511
512         body = lustre_msg_buf(request->rq_reqmsg, 0);
513
514         desc = ptlrpc_prep_bulk(connection);
515         if (!desc)
516                 GOTO(out_req, rc = -ENOMEM);
517         desc->bd_portal = OSC_BULK_PORTAL;
518         desc->bd_ptl_ev_hdlr = osc_ptl_ev_hdlr;
519         OBD_ALLOC(cb_data, sizeof(*cb_data));
520         if (!cb_data)
521                 GOTO(out_desc, rc = -ENOMEM);
522
523         cb_data->callback = callback;
524         cb_data->cb_data = data;
525         CDEBUG(D_PAGE, "data(%p)->desc = %p\n", data, desc);
526         data->brw_desc = desc;
527         desc->bd_ptl_ev_data = cb_data;
528
529         iooptr = lustre_msg_buf(request->rq_reqmsg, 1);
530         nioptr = lustre_msg_buf(request->rq_reqmsg, 2);
531         ost_pack_ioo(&iooptr, md, page_count);
532         /* end almost identical to brw_read case */
533
534         OBD_ALLOC(local, page_count * sizeof(*local));
535         if (!local)
536                 GOTO(out_cb, rc = -ENOMEM);
537
538         cb_data->obd_data = local;
539         cb_data->obd_size = page_count * sizeof(*local);
540
541         obd_kmap_get(page_count, 0);
542
543         for (mapped = 0; mapped < page_count; mapped++) {
544                 local[mapped].addr = kmap(pga[mapped].pg);
545
546                 CDEBUG(D_INFO, "kmap(pg) = %p ; pg->flags = %lx ; pg->count = "
547                        "%d ; page %d of %d\n",
548                        local[mapped].addr, pga[mapped].pg->flags,
549                        page_count(pga[mapped].pg),
550                        mapped, page_count - 1);
551
552                 local[mapped].offset = pga[mapped].off;
553                 local[mapped].len = pga[mapped].count;
554                 ost_pack_niobuf(&nioptr, pga[mapped].off, pga[mapped].count,
555                                 pga[mapped].flag, 0);
556         }
557
558         size[1] = page_count * sizeof(*remote);
559         request->rq_replen = lustre_msg_size(2, size);
560         rc = ptlrpc_queue_wait(request);
561         rc = ptlrpc_check_status(request, rc);
562         if (rc)
563                 GOTO(out_unmap, rc);
564
565         nioptr = lustre_msg_buf(request->rq_repmsg, 1);
566         if (!nioptr)
567                 GOTO(out_unmap, rc = -EINVAL);
568
569         if (request->rq_repmsg->buflens[1] != size[1]) {
570                 CERROR("buffer length wrong (%d vs. %d)\n",
571                        request->rq_repmsg->buflens[1], size[1]);
572                 GOTO(out_unmap, rc = -EINVAL);
573         }
574
575         for (j = 0; j < page_count; j++) {
576                 struct ptlrpc_bulk_page *bulk;
577
578                 ost_unpack_niobuf(&nioptr, &remote);
579
580                 bulk = ptlrpc_prep_bulk_page(desc);
581                 if (!bulk)
582                         GOTO(out_unmap, rc = -ENOMEM);
583
584                 bulk->bp_buf = (void *)(unsigned long)local[j].addr;
585                 bulk->bp_buflen = local[j].len;
586                 bulk->bp_xid = remote->xid;
587                 bulk->bp_page = pga[j].pg;
588         }
589
590         if (desc->bd_page_count != page_count)
591                 LBUG();
592
593         if (OBD_FAIL_CHECK(OBD_FAIL_OSC_BRW_WRITE_BULK))
594                 GOTO(out_unmap, rc = 0);
595
596         /* Our reference is released when brw_finish is complete. */
597         rc = ptlrpc_send_bulk(desc);
598
599         /* XXX: Mike, same question as in osc_brw_read. */
600         if (rc)
601                 GOTO(out_req, rc);
602
603         /* Callbacks cause asynchronous handling. */
604         rc = callback(data, 0, CB_PHASE_START);
605
606 out_req:
607         ptlrpc_req_finished(request);
608         RETURN(rc);
609
610         /* Clean up on error. */
611 out_unmap:
612         while (mapped-- > 0)
613                 kunmap(pga[mapped].pg);
614
615         obd_kmap_put(page_count);
616
617         OBD_FREE(local, page_count * sizeof(*local));
618 out_cb:
619         OBD_FREE(cb_data, sizeof(*cb_data));
620 out_desc:
621         ptlrpc_bulk_decref(desc);
622         goto out_req;
623 }
624
625 static int osc_brw(int cmd, struct lustre_handle *conn,
626                    struct lov_stripe_md *md, obd_count page_count,
627                    struct brw_page *pga, brw_cb_t callback,
628                    struct brw_cb_data *data)
629 {
630         ENTRY;
631
632         while (page_count) {
633                 obd_count pages_per_brw;
634                 int rc;
635
636                 if (page_count > PTL_MD_MAX_IOV)
637                         pages_per_brw = PTL_MD_MAX_IOV;
638                 else
639                         pages_per_brw = page_count;
640
641                 if (cmd & OBD_BRW_WRITE)
642                         rc = osc_brw_write(conn, md, pages_per_brw, pga,
643                                            callback, data);
644                 else
645                         rc = osc_brw_read(conn, md, pages_per_brw, pga,
646                                           callback, data);
647
648                 if (rc != 0)
649                         RETURN(rc);
650
651                 page_count -= pages_per_brw;
652                 pga += pages_per_brw;
653         }
654         RETURN(0);
655 }
656
657 static int osc_enqueue(struct lustre_handle *connh, struct lov_stripe_md *lsm,
658                        struct lustre_handle *parent_lock,
659                        __u32 type, void *extentp, int extent_len, __u32 mode,
660                        int *flags, void *callback, void *data, int datalen,
661                        struct lustre_handle *lockh)
662 {
663         __u64 res_id[RES_NAME_SIZE] = { lsm->lsm_object_id };
664         struct obd_device *obddev = class_conn2obd(connh);
665         struct ldlm_extent *extent = extentp;
666         int rc;
667         ENTRY;
668
669         /* Filesystem locks are given a bit of special treatment: if
670          * this is not a file size lock (which has end == -1), we
671          * fixup the lock to start and end on page boundaries. */
672         if (extent->end != OBD_OBJECT_EOF) {
673                 extent->start &= PAGE_MASK;
674                 extent->end = (extent->end + PAGE_SIZE - 1) & PAGE_MASK;
675         }
676
677         /* Next, search for already existing extent locks that will cover us */
678         rc = ldlm_lock_match(obddev->obd_namespace, res_id, type, extent,
679                              sizeof(extent), mode, lockh);
680         if (rc == 1)
681                 /* We already have a lock, and it's referenced */
682                 RETURN(ELDLM_OK);
683
684         /* If we're trying to read, we also search for an existing PW lock.  The
685          * VFS and page cache already protect us locally, so lots of readers/
686          * writers can share a single PW lock.
687          *
688          * There are problems with conversion deadlocks, so instead of
689          * converting a read lock to a write lock, we'll just enqueue a new
690          * one.
691          *
692          * At some point we should cancel the read lock instead of making them
693          * send us a blocking callback, but there are problems with canceling
694          * locks out from other users right now, too. */
695
696         if (mode == LCK_PR) {
697                 rc = ldlm_lock_match(obddev->obd_namespace, res_id, type,
698                                      extent, sizeof(extent), LCK_PW, lockh);
699                 if (rc == 1) {
700                         /* FIXME: This is not incredibly elegant, but it might
701                          * be more elegant than adding another parameter to
702                          * lock_match.  I want a second opinion. */
703                         ldlm_lock_addref(lockh, LCK_PR);
704                         ldlm_lock_decref(lockh, LCK_PW);
705
706                         RETURN(ELDLM_OK);
707                 }
708         }
709
710         rc = ldlm_cli_enqueue(connh, NULL, obddev->obd_namespace, parent_lock,
711                               res_id, type, extent, sizeof(extent), mode, flags,
712                               ldlm_completion_ast, callback, data, datalen,
713                               lockh);
714         RETURN(rc);
715 }
716
717 static int osc_cancel(struct lustre_handle *oconn, struct lov_stripe_md *md,
718                       __u32 mode, struct lustre_handle *lockh)
719 {
720         ENTRY;
721
722         ldlm_lock_decref(lockh, mode);
723
724         RETURN(0);
725 }
726
727 static int osc_cancel_unused(struct lustre_handle *connh,
728                              struct lov_stripe_md *lsm, int flags)
729 {
730         struct obd_device *obddev = class_conn2obd(connh);
731         __u64 res_id[RES_NAME_SIZE] = { lsm->lsm_object_id };
732
733         return ldlm_cli_cancel_unused(obddev->obd_namespace, res_id, flags);
734 }
735
736 static int osc_statfs(struct lustre_handle *conn, struct obd_statfs *osfs)
737 {
738         struct ptlrpc_request *request;
739         int rc, size = sizeof(*osfs);
740         ENTRY;
741
742         request = ptlrpc_prep_req(class_conn2cliimp(conn), OST_STATFS, 0, NULL,
743                                   NULL);
744         if (!request)
745                 RETURN(-ENOMEM);
746
747         request->rq_replen = lustre_msg_size(1, &size);
748
749         rc = ptlrpc_queue_wait(request);
750         rc = ptlrpc_check_status(request, rc);
751         if (rc) {
752                 CERROR("%s failed: rc = %d\n", __FUNCTION__, rc);
753                 GOTO(out, rc);
754         }
755
756         obd_statfs_unpack(osfs, lustre_msg_buf(request->rq_repmsg, 0));
757
758         EXIT;
759  out:
760         ptlrpc_req_finished(request);
761         return rc;
762 }
763
764 static int osc_iocontrol(long cmd, struct lustre_handle *conn, int len,
765                          void *karg, void *uarg)
766 {
767         struct obd_device *obddev = class_conn2obd(conn);
768         struct obd_ioctl_data *data = karg;
769         int err = 0;
770         ENTRY;
771
772         switch (cmd) {
773         case IOC_LDLM_TEST: {
774                 err = ldlm_test(obddev, conn);
775                 CERROR("-- done err %d\n", err);
776                 GOTO(out, err);
777         }
778         case IOC_LDLM_REGRESS_START: {
779                 unsigned int numthreads = 1;
780                 unsigned int numheld = 10;
781                 unsigned int numres = 10;
782                 unsigned int numext = 10;
783                 char *parse;
784
785                 if (data->ioc_inllen1) {
786                         parse = data->ioc_inlbuf1;
787                         if (*parse != '\0') {
788                                 while(isspace(*parse)) parse++;
789                                 numthreads = simple_strtoul(parse, &parse, 0);
790                                 while(isspace(*parse)) parse++;
791                         }
792                         if (*parse != '\0') {
793                                 while(isspace(*parse)) parse++;
794                                 numheld = simple_strtoul(parse, &parse, 0);
795                                 while(isspace(*parse)) parse++;
796                         }
797                         if (*parse != '\0') {
798                                 while(isspace(*parse)) parse++;
799                                 numres = simple_strtoul(parse, &parse, 0);
800                                 while(isspace(*parse)) parse++;
801                         }
802                         if (*parse != '\0') {
803                                 while(isspace(*parse)) parse++;
804                                 numext = simple_strtoul(parse, &parse, 0);
805                                 while(isspace(*parse)) parse++;
806                         }
807                 }
808
809                 err = ldlm_regression_start(obddev, conn, numthreads,
810                                 numheld, numres, numext);
811
812                 CERROR("-- done err %d\n", err);
813                 GOTO(out, err);
814         }
815         case IOC_LDLM_REGRESS_STOP: {
816                 err = ldlm_regression_stop();
817                 CERROR("-- done err %d\n", err);
818                 GOTO(out, err);
819         }
820         case IOC_OSC_REGISTER_LOV: {
821                 if (obddev->u.cli.cl_containing_lov)
822                         GOTO(out, err = -EALREADY);
823                 obddev->u.cli.cl_containing_lov = (struct obd_device *)karg;
824                 GOTO(out, err);
825         }
826         case OBD_IOC_LOV_GET_CONFIG: {
827                 char *buf;
828                 struct lov_desc *desc;
829                 obd_uuid_t *uuidp;
830
831                 buf = NULL;
832                 len = 0;
833                 if (obd_ioctl_getdata(&buf, &len, (void *)uarg))
834                         GOTO(out, err = -EINVAL);
835
836                 data = (struct obd_ioctl_data *)buf;
837
838                 if (sizeof(*desc) > data->ioc_inllen1) {
839                         OBD_FREE(buf, len);
840                         GOTO(out, err = -EINVAL);
841                 }
842
843                 if (data->ioc_inllen2 < sizeof(*uuidp)) {
844                         OBD_FREE(buf, len);
845                         GOTO(out, err = -EINVAL);
846                 }
847
848                 desc = (struct lov_desc *)data->ioc_inlbuf1;
849                 desc->ld_tgt_count = 1;
850                 desc->ld_active_tgt_count = 1;
851                 desc->ld_default_stripe_count = 1;
852                 desc->ld_default_stripe_size = 0;
853                 desc->ld_default_stripe_offset = 0;
854                 desc->ld_pattern = 0;
855                 memcpy(desc->ld_uuid,  obddev->obd_uuid, sizeof(*uuidp));
856
857                 uuidp = (obd_uuid_t *)data->ioc_inlbuf2;
858                 memcpy(uuidp,  obddev->obd_uuid, sizeof(*uuidp));
859
860                 err = copy_to_user((void *)uarg, buf, len);
861                 OBD_FREE(buf, len);
862                 GOTO(out, err);
863         }
864         default:
865                 GOTO(out, err = -ENOTTY);
866         }
867 out:
868         return err;
869 }
870
871 int osc_attach(struct obd_device *dev, 
872                    obd_count len, void *data)
873 {
874         return lprocfs_reg_obd(dev, status_var_nm_1, dev);
875 }
876
877 int osc_detach(struct obd_device *dev)
878 {
879         return lprocfs_dereg_obd(dev);
880 }
881 struct obd_ops osc_obd_ops = {
882         o_attach:       osc_attach,
883         o_detach:       osc_detach,
884         o_setup:        client_obd_setup,
885         o_cleanup:      client_obd_cleanup,
886         o_statfs:       osc_statfs,
887         o_create:       osc_create,
888         o_destroy:      osc_destroy,
889         o_getattr:      osc_getattr,
890         o_setattr:      osc_setattr,
891         o_open:         osc_open,
892         o_close:        osc_close,
893         o_connect:      client_obd_connect,
894         o_disconnect:   client_obd_disconnect,
895         o_brw:          osc_brw,
896         o_punch:        osc_punch,
897         o_enqueue:      osc_enqueue,
898         o_cancel:       osc_cancel,
899         o_cancel_unused: osc_cancel_unused,
900         o_iocontrol:    osc_iocontrol
901 };
902
903 static int __init osc_init(void)
904 {
905         int rc;
906         rc = class_register_type(&osc_obd_ops, status_class_var, 
907                                  LUSTRE_OSC_NAME);
908         RETURN(rc);
909                
910 }
911
912 static void __exit osc_exit(void)
913 {
914         class_unregister_type(LUSTRE_OSC_NAME);
915 }
916
917 MODULE_AUTHOR("Cluster File Systems, Inc. <info@clusterfs.com>");
918 MODULE_DESCRIPTION("Lustre Object Storage Client (OSC) v1.0");
919 MODULE_LICENSE("GPL");
920
921 module_init(osc_init);
922 module_exit(osc_exit);