Whamcloud - gitweb
22f3dcdf843340df1e5b8c6e467f28aae9d82287
[fs/lustre-release.git] / lustre / osc / osc_request.c
1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2  * vim:expandtab:shiftwidth=8:tabstop=8:
3  *
4  * GPL HEADER START
5  *
6  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
7  *
8  * This program is free software; you can redistribute it and/or modify
9  * it under the terms of the GNU General Public License version 2 only,
10  * as published by the Free Software Foundation.
11  *
12  * This program is distributed in the hope that it will be useful, but
13  * WITHOUT ANY WARRANTY; without even the implied warranty of
14  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
15  * General Public License version 2 for more details (a copy is included
16  * in the LICENSE file that accompanied this code).
17  *
18  * You should have received a copy of the GNU General Public License
19  * version 2 along with this program; If not, see
20  * http://www.sun.com/software/products/lustre/docs/GPLv2.pdf
21  *
22  * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara,
23  * CA 95054 USA or visit www.sun.com if you need additional information or
24  * have any questions.
25  *
26  * GPL HEADER END
27  */
28 /*
29  * Copyright  2008 Sun Microsystems, Inc. All rights reserved
30  * Use is subject to license terms.
31  */
32 /*
33  * This file is part of Lustre, http://www.lustre.org/
34  * Lustre is a trademark of Sun Microsystems, Inc.
35  */
36
37 #ifndef EXPORT_SYMTAB
38 # define EXPORT_SYMTAB
39 #endif
40 #define DEBUG_SUBSYSTEM S_OSC
41
42 #include <libcfs/libcfs.h>
43
44 #ifndef __KERNEL__
45 # include <liblustre.h>
46 #endif
47
48 #include <lustre_dlm.h>
49 #include <lustre_net.h>
50 #include <lustre/lustre_user.h>
51 #include <obd_cksum.h>
52 #include <obd_ost.h>
53 #include <obd_lov.h>
54
55 #ifdef  __CYGWIN__
56 # include <ctype.h>
57 #endif
58
59 #include <lustre_ha.h>
60 #include <lprocfs_status.h>
61 #include <lustre_log.h>
62 #include <lustre_debug.h>
63 #include <lustre_param.h>
64 #include <lustre_cache.h>
65 #include "osc_internal.h"
66
67 static quota_interface_t *quota_interface = NULL;
68 extern quota_interface_t osc_quota_interface;
69
70 static void osc_release_ppga(struct brw_page **ppga, obd_count count);
71 static int brw_interpret(struct ptlrpc_request *request, void *data, int rc);
72 int osc_cleanup(struct obd_device *obd);
73
74 /* Pack OSC object metadata for disk storage (LE byte order). */
75 static int osc_packmd(struct obd_export *exp, struct lov_mds_md **lmmp,
76                       struct lov_stripe_md *lsm)
77 {
78         int lmm_size;
79         ENTRY;
80
81         lmm_size = sizeof(**lmmp);
82         if (!lmmp)
83                 RETURN(lmm_size);
84
85         if (*lmmp && !lsm) {
86                 OBD_FREE(*lmmp, lmm_size);
87                 *lmmp = NULL;
88                 RETURN(0);
89         }
90
91         if (!*lmmp) {
92                 OBD_ALLOC(*lmmp, lmm_size);
93                 if (!*lmmp)
94                         RETURN(-ENOMEM);
95         }
96
97         if (lsm) {
98                 LASSERT(lsm->lsm_object_id);
99                 LASSERT(lsm->lsm_object_gr);
100                 (*lmmp)->lmm_object_id = cpu_to_le64(lsm->lsm_object_id);
101                 (*lmmp)->lmm_object_gr = cpu_to_le64(lsm->lsm_object_gr);
102         }
103
104         RETURN(lmm_size);
105 }
106
107 /* Unpack OSC object metadata from disk storage (LE byte order). */
108 static int osc_unpackmd(struct obd_export *exp, struct lov_stripe_md **lsmp,
109                         struct lov_mds_md *lmm, int lmm_bytes)
110 {
111         int lsm_size;
112         ENTRY;
113
114         if (lmm != NULL) {
115                 if (lmm_bytes < sizeof (*lmm)) {
116                         CERROR("lov_mds_md too small: %d, need %d\n",
117                                lmm_bytes, (int)sizeof(*lmm));
118                         RETURN(-EINVAL);
119                 }
120                 /* XXX LOV_MAGIC etc check? */
121
122                 if (lmm->lmm_object_id == 0) {
123                         CERROR("lov_mds_md: zero lmm_object_id\n");
124                         RETURN(-EINVAL);
125                 }
126         }
127
128         lsm_size = lov_stripe_md_size(1);
129         if (lsmp == NULL)
130                 RETURN(lsm_size);
131
132         if (*lsmp != NULL && lmm == NULL) {
133                 OBD_FREE((*lsmp)->lsm_oinfo[0], sizeof(struct lov_oinfo));
134                 OBD_FREE(*lsmp, lsm_size);
135                 *lsmp = NULL;
136                 RETURN(0);
137         }
138
139         if (*lsmp == NULL) {
140                 OBD_ALLOC(*lsmp, lsm_size);
141                 if (*lsmp == NULL)
142                         RETURN(-ENOMEM);
143                 OBD_ALLOC((*lsmp)->lsm_oinfo[0], sizeof(struct lov_oinfo));
144                 if ((*lsmp)->lsm_oinfo[0] == NULL) {
145                         OBD_FREE(*lsmp, lsm_size);
146                         RETURN(-ENOMEM);
147                 }
148                 loi_init((*lsmp)->lsm_oinfo[0]);
149         }
150
151         if (lmm != NULL) {
152                 /* XXX zero *lsmp? */
153                 (*lsmp)->lsm_object_id = le64_to_cpu (lmm->lmm_object_id);
154                 (*lsmp)->lsm_object_gr = le64_to_cpu (lmm->lmm_object_gr);
155                 LASSERT((*lsmp)->lsm_object_id);
156                 LASSERT((*lsmp)->lsm_object_gr);
157         }
158
159         (*lsmp)->lsm_maxbytes = LUSTRE_STRIPE_MAXBYTES;
160
161         RETURN(lsm_size);
162 }
163
164 static inline void osc_pack_capa(struct ptlrpc_request *req,
165                                  struct ost_body *body, void *capa)
166 {
167         struct obd_capa *oc = (struct obd_capa *)capa;
168         struct lustre_capa *c;
169
170         if (!capa)
171                 return;
172
173         c = req_capsule_client_get(&req->rq_pill, &RMF_CAPA1);
174         LASSERT(c);
175         capa_cpy(c, oc);
176         body->oa.o_valid |= OBD_MD_FLOSSCAPA;
177         DEBUG_CAPA(D_SEC, c, "pack");
178 }
179
180 static inline void osc_pack_req_body(struct ptlrpc_request *req,
181                                      struct obd_info *oinfo)
182 {
183         struct ost_body *body;
184
185         body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
186         LASSERT(body);
187
188         body->oa = *oinfo->oi_oa;
189         osc_pack_capa(req, body, oinfo->oi_capa);
190 }
191
192 static inline void osc_set_capa_size(struct ptlrpc_request *req,
193                                      const struct req_msg_field *field,
194                                      struct obd_capa *oc)
195 {
196         if (oc == NULL)
197                 req_capsule_set_size(&req->rq_pill, field, RCL_CLIENT, 0);
198         else
199                 /* it is already calculated as sizeof struct obd_capa */
200                 ;
201 }
202
203 static int osc_getattr_interpret(struct ptlrpc_request *req,
204                                  struct osc_async_args *aa, int rc)
205 {
206         struct ost_body *body;
207         ENTRY;
208
209         if (rc != 0)
210                 GOTO(out, rc);
211
212         body = lustre_swab_repbuf(req, REPLY_REC_OFF, sizeof(*body),
213                                   lustre_swab_ost_body);
214         if (body) {
215                 CDEBUG(D_INODE, "mode: %o\n", body->oa.o_mode);
216                 memcpy(aa->aa_oi->oi_oa, &body->oa, sizeof(*aa->aa_oi->oi_oa));
217
218                 /* This should really be sent by the OST */
219                 aa->aa_oi->oi_oa->o_blksize = PTLRPC_MAX_BRW_SIZE;
220                 aa->aa_oi->oi_oa->o_valid |= OBD_MD_FLBLKSZ;
221         } else {
222                 CDEBUG(D_INFO, "can't unpack ost_body\n");
223                 rc = -EPROTO;
224                 aa->aa_oi->oi_oa->o_valid = 0;
225         }
226 out:
227         rc = aa->aa_oi->oi_cb_up(aa->aa_oi, rc);
228         RETURN(rc);
229 }
230
231 static int osc_getattr_async(struct obd_export *exp, struct obd_info *oinfo,
232                              struct ptlrpc_request_set *set)
233 {
234         struct ptlrpc_request *req;
235         struct osc_async_args *aa;
236         int                    rc;
237         ENTRY;
238
239         req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_GETATTR);
240         if (req == NULL)
241                 RETURN(-ENOMEM);
242
243         osc_set_capa_size(req, &RMF_CAPA1, oinfo->oi_capa);
244         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_GETATTR);
245         if (rc) {
246                 ptlrpc_request_free(req);
247                 RETURN(rc);
248         }
249
250         osc_pack_req_body(req, oinfo);
251
252         ptlrpc_request_set_replen(req);
253         req->rq_interpret_reply = osc_getattr_interpret;
254
255         CLASSERT(sizeof(*aa) <= sizeof(req->rq_async_args));
256         aa = ptlrpc_req_async_args(req);
257         aa->aa_oi = oinfo;
258
259         ptlrpc_set_add_req(set, req);
260         RETURN(0);
261 }
262
263 static int osc_getattr(struct obd_export *exp, struct obd_info *oinfo)
264 {
265         struct ptlrpc_request *req;
266         struct ost_body       *body;
267         int                    rc;
268         ENTRY;
269
270         req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_GETATTR);
271         if (req == NULL)
272                 RETURN(-ENOMEM);
273
274         osc_set_capa_size(req, &RMF_CAPA1, oinfo->oi_capa);
275         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_GETATTR);
276         if (rc) {
277                 ptlrpc_request_free(req);
278                 RETURN(rc);
279         }
280
281         osc_pack_req_body(req, oinfo);
282
283         ptlrpc_request_set_replen(req);
284
285         rc = ptlrpc_queue_wait(req);
286         if (rc)
287                 GOTO(out, rc);
288
289         body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
290         if (body == NULL)
291                 GOTO(out, rc = -EPROTO);
292
293         CDEBUG(D_INODE, "mode: %o\n", body->oa.o_mode);
294         *oinfo->oi_oa = body->oa;
295
296         /* This should really be sent by the OST */
297         oinfo->oi_oa->o_blksize = PTLRPC_MAX_BRW_SIZE;
298         oinfo->oi_oa->o_valid |= OBD_MD_FLBLKSZ;
299
300         EXIT;
301  out:
302         ptlrpc_req_finished(req);
303         return rc;
304 }
305
306 static int osc_setattr(struct obd_export *exp, struct obd_info *oinfo,
307                        struct obd_trans_info *oti)
308 {
309         struct ptlrpc_request *req;
310         struct ost_body       *body;
311         int                    rc;
312         ENTRY;
313
314         LASSERT(!(oinfo->oi_oa->o_valid & OBD_MD_FLGROUP) ||
315                                         oinfo->oi_oa->o_gr > 0);
316
317         req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_SETATTR);
318         if (req == NULL)
319                 RETURN(-ENOMEM);
320
321         osc_set_capa_size(req, &RMF_CAPA1, oinfo->oi_capa);
322         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_SETATTR);
323         if (rc) {
324                 ptlrpc_request_free(req);
325                 RETURN(rc);
326         }
327
328         osc_pack_req_body(req, oinfo);
329
330         ptlrpc_request_set_replen(req);
331
332         rc = ptlrpc_queue_wait(req);
333         if (rc)
334                 GOTO(out, rc);
335
336         body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
337         if (body == NULL)
338                 GOTO(out, rc = -EPROTO);
339
340         *oinfo->oi_oa = body->oa;
341
342         EXIT;
343 out:
344         ptlrpc_req_finished(req);
345         RETURN(rc);
346 }
347
348 static int osc_setattr_interpret(struct ptlrpc_request *req,
349                                  struct osc_async_args *aa, int rc)
350 {
351         struct ost_body *body;
352         ENTRY;
353
354         if (rc != 0)
355                 GOTO(out, rc);
356
357         body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
358         if (body == NULL)
359                 GOTO(out, rc = -EPROTO);
360
361         *aa->aa_oi->oi_oa = body->oa;
362 out:
363         rc = aa->aa_oi->oi_cb_up(aa->aa_oi, rc);
364         RETURN(rc);
365 }
366
367 static int osc_setattr_async(struct obd_export *exp, struct obd_info *oinfo,
368                              struct obd_trans_info *oti,
369                              struct ptlrpc_request_set *rqset)
370 {
371         struct ptlrpc_request *req;
372         struct osc_async_args *aa;
373         int                    rc;
374         ENTRY;
375
376         req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_SETATTR);
377         if (req == NULL)
378                 RETURN(-ENOMEM);
379
380         osc_set_capa_size(req, &RMF_CAPA1, oinfo->oi_capa);
381         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_SETATTR);
382         if (rc) {
383                 ptlrpc_request_free(req);
384                 RETURN(rc);
385         }
386
387         osc_pack_req_body(req, oinfo);
388
389         ptlrpc_request_set_replen(req);
390
391         if (oinfo->oi_oa->o_valid & OBD_MD_FLCOOKIE) {
392                 LASSERT(oti);
393                 *obdo_logcookie(oinfo->oi_oa) = *oti->oti_logcookies;
394         }
395
396         /* do mds to ost setattr asynchronouly */
397         if (!rqset) {
398                 /* Do not wait for response. */
399                 ptlrpcd_add_req(req);
400         } else {
401                 req->rq_interpret_reply = osc_setattr_interpret;
402
403                 CLASSERT (sizeof(*aa) <= sizeof(req->rq_async_args));
404                 aa = ptlrpc_req_async_args(req);
405                 aa->aa_oi = oinfo;
406
407                 ptlrpc_set_add_req(rqset, req);
408         }
409
410         RETURN(0);
411 }
412
413 int osc_real_create(struct obd_export *exp, struct obdo *oa,
414                     struct lov_stripe_md **ea, struct obd_trans_info *oti)
415 {
416         struct ptlrpc_request *req;
417         struct ost_body       *body;
418         struct lov_stripe_md  *lsm;
419         int                    rc;
420         ENTRY;
421
422         LASSERT(oa);
423         LASSERT(ea);
424
425         lsm = *ea;
426         if (!lsm) {
427                 rc = obd_alloc_memmd(exp, &lsm);
428                 if (rc < 0)
429                         RETURN(rc);
430         }
431
432         req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_CREATE);
433         if (req == NULL)
434                 GOTO(out, rc = -ENOMEM);
435
436         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_CREATE);
437         if (rc) {
438                 ptlrpc_request_free(req);
439                 GOTO(out, rc);
440         }
441
442         body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
443         LASSERT(body);
444         body->oa = *oa;
445
446         ptlrpc_request_set_replen(req);
447
448         if (oa->o_valid & OBD_MD_FLINLINE) {
449                 LASSERT((oa->o_valid & OBD_MD_FLFLAGS) &&
450                         oa->o_flags == OBD_FL_DELORPHAN);
451                 DEBUG_REQ(D_HA, req,
452                           "delorphan from OST integration");
453                 /* Don't resend the delorphan req */
454                 req->rq_no_resend = req->rq_no_delay = 1;
455         }
456
457         rc = ptlrpc_queue_wait(req);
458         if (rc)
459                 GOTO(out_req, rc);
460
461         body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
462         if (body == NULL)
463                 GOTO(out_req, rc = -EPROTO);
464
465         *oa = body->oa;
466
467         /* This should really be sent by the OST */
468         oa->o_blksize = PTLRPC_MAX_BRW_SIZE;
469         oa->o_valid |= OBD_MD_FLBLKSZ;
470
471         /* XXX LOV STACKING: the lsm that is passed to us from LOV does not
472          * have valid lsm_oinfo data structs, so don't go touching that.
473          * This needs to be fixed in a big way.
474          */
475         lsm->lsm_object_id = oa->o_id;
476         lsm->lsm_object_gr = oa->o_gr;
477         *ea = lsm;
478
479         if (oti != NULL) {
480                 oti->oti_transno = lustre_msg_get_transno(req->rq_repmsg);
481
482                 if (oa->o_valid & OBD_MD_FLCOOKIE) {
483                         if (!oti->oti_logcookies)
484                                 oti_alloc_cookies(oti, 1);
485                         *oti->oti_logcookies = *obdo_logcookie(oa);
486                 }
487         }
488
489         CDEBUG(D_HA, "transno: "LPD64"\n",
490                lustre_msg_get_transno(req->rq_repmsg));
491 out_req:
492         ptlrpc_req_finished(req);
493 out:
494         if (rc && !*ea)
495                 obd_free_memmd(exp, &lsm);
496         RETURN(rc);
497 }
498
499 static int osc_punch_interpret(struct ptlrpc_request *req,
500                                struct osc_async_args *aa, int rc)
501 {
502         struct ost_body *body;
503         ENTRY;
504
505         if (rc != 0)
506                 GOTO(out, rc);
507
508         body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
509         if (body == NULL)
510                 GOTO(out, rc = -EPROTO);
511
512         *aa->aa_oi->oi_oa = body->oa;
513 out:
514         rc = aa->aa_oi->oi_cb_up(aa->aa_oi, rc);
515         RETURN(rc);
516 }
517
518 static int osc_punch(struct obd_export *exp, struct obd_info *oinfo,
519                      struct obd_trans_info *oti,
520                      struct ptlrpc_request_set *rqset)
521 {
522         struct ptlrpc_request *req;
523         struct osc_async_args *aa;
524         struct ost_body       *body;
525         int                    rc;
526         ENTRY;
527
528         if (!oinfo->oi_oa) {
529                 CDEBUG(D_INFO, "oa NULL\n");
530                 RETURN(-EINVAL);
531         }
532
533         req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_PUNCH);
534         if (req == NULL)
535                 RETURN(-ENOMEM);
536
537         osc_set_capa_size(req, &RMF_CAPA1, oinfo->oi_capa);
538         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_PUNCH);
539         if (rc) {
540                 ptlrpc_request_free(req);
541                 RETURN(rc);
542         }
543         req->rq_request_portal = OST_IO_PORTAL; /* bug 7198 */
544         ptlrpc_at_set_req_timeout(req);
545         osc_pack_req_body(req, oinfo);
546
547         /* overload the size and blocks fields in the oa with start/end */
548         body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
549         LASSERT(body);
550         body->oa.o_size = oinfo->oi_policy.l_extent.start;
551         body->oa.o_blocks = oinfo->oi_policy.l_extent.end;
552         body->oa.o_valid |= (OBD_MD_FLSIZE | OBD_MD_FLBLOCKS);
553         ptlrpc_request_set_replen(req);
554
555
556         req->rq_interpret_reply = osc_punch_interpret;
557         CLASSERT (sizeof(*aa) <= sizeof(req->rq_async_args));
558         aa = ptlrpc_req_async_args(req);
559         aa->aa_oi = oinfo;
560         ptlrpc_set_add_req(rqset, req);
561
562         RETURN(0);
563 }
564
565 static int osc_sync(struct obd_export *exp, struct obdo *oa,
566                     struct lov_stripe_md *md, obd_size start, obd_size end,
567                     void *capa)
568 {
569         struct ptlrpc_request *req;
570         struct ost_body       *body;
571         int                    rc;
572         ENTRY;
573
574         if (!oa) {
575                 CDEBUG(D_INFO, "oa NULL\n");
576                 RETURN(-EINVAL);
577         }
578
579         req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_SYNC);
580         if (req == NULL)
581                 RETURN(-ENOMEM);
582
583         osc_set_capa_size(req, &RMF_CAPA1, capa);
584         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_SYNC);
585         if (rc) {
586                 ptlrpc_request_free(req);
587                 RETURN(rc);
588         }
589
590         /* overload the size and blocks fields in the oa with start/end */
591         body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
592         LASSERT(body);
593         body->oa = *oa;
594         body->oa.o_size = start;
595         body->oa.o_blocks = end;
596         body->oa.o_valid |= (OBD_MD_FLSIZE | OBD_MD_FLBLOCKS);
597         osc_pack_capa(req, body, capa);
598
599         ptlrpc_request_set_replen(req);
600
601         rc = ptlrpc_queue_wait(req);
602         if (rc)
603                 GOTO(out, rc);
604
605         body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
606         if (body == NULL)
607                 GOTO(out, rc = -EPROTO);
608
609         *oa = body->oa;
610
611         EXIT;
612  out:
613         ptlrpc_req_finished(req);
614         return rc;
615 }
616
617 /* Find and cancel locally locks matched by @mode in the resource found by
618  * @objid. Found locks are added into @cancel list. Returns the amount of
619  * locks added to @cancels list. */
620 static int osc_resource_get_unused(struct obd_export *exp, struct obdo *oa,
621                                    struct list_head *cancels, ldlm_mode_t mode,
622                                    int lock_flags)
623 {
624         struct ldlm_namespace *ns = exp->exp_obd->obd_namespace;
625         struct ldlm_res_id res_id;
626         struct ldlm_resource *res;
627         int count;
628         ENTRY;
629
630         osc_build_res_name(oa->o_id, oa->o_gr, &res_id);
631         res = ldlm_resource_get(ns, NULL, &res_id, 0, 0);
632         if (res == NULL)
633                 RETURN(0);
634
635         count = ldlm_cancel_resource_local(res, cancels, NULL, mode,
636                                            lock_flags, 0, NULL);
637         ldlm_resource_putref(res);
638         RETURN(count);
639 }
640
641 static int osc_destroy_interpret(struct ptlrpc_request *req, void *data,
642                                  int rc)
643 {
644         struct client_obd *cli = &req->rq_import->imp_obd->u.cli;
645
646         atomic_dec(&cli->cl_destroy_in_flight);
647         cfs_waitq_signal(&cli->cl_destroy_waitq);
648         return 0;
649 }
650
651 static int osc_can_send_destroy(struct client_obd *cli)
652 {
653         if (atomic_inc_return(&cli->cl_destroy_in_flight) <=
654             cli->cl_max_rpcs_in_flight) {
655                 /* The destroy request can be sent */
656                 return 1;
657         }
658         if (atomic_dec_return(&cli->cl_destroy_in_flight) <
659             cli->cl_max_rpcs_in_flight) {
660                 /*
661                  * The counter has been modified between the two atomic
662                  * operations.
663                  */
664                 cfs_waitq_signal(&cli->cl_destroy_waitq);
665         }
666         return 0;
667 }
668
669 /* Destroy requests can be async always on the client, and we don't even really
670  * care about the return code since the client cannot do anything at all about
671  * a destroy failure.
672  * When the MDS is unlinking a filename, it saves the file objects into a
673  * recovery llog, and these object records are cancelled when the OST reports
674  * they were destroyed and sync'd to disk (i.e. transaction committed).
675  * If the client dies, or the OST is down when the object should be destroyed,
676  * the records are not cancelled, and when the OST reconnects to the MDS next,
677  * it will retrieve the llog unlink logs and then sends the log cancellation
678  * cookies to the MDS after committing destroy transactions. */
679 static int osc_destroy(struct obd_export *exp, struct obdo *oa,
680                        struct lov_stripe_md *ea, struct obd_trans_info *oti,
681                        struct obd_export *md_export)
682 {
683         struct client_obd     *cli = &exp->exp_obd->u.cli;
684         struct ptlrpc_request *req;
685         struct ost_body       *body;
686         CFS_LIST_HEAD(cancels);
687         int rc, count;
688         ENTRY;
689
690         if (!oa) {
691                 CDEBUG(D_INFO, "oa NULL\n");
692                 RETURN(-EINVAL);
693         }
694
695         count = osc_resource_get_unused(exp, oa, &cancels, LCK_PW,
696                                         LDLM_FL_DISCARD_DATA);
697
698         req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_DESTROY);
699         if (req == NULL) {
700                 ldlm_lock_list_put(&cancels, l_bl_ast, count);
701                 RETURN(-ENOMEM);
702         }
703
704         rc = ldlm_prep_elc_req(exp, req, LUSTRE_OST_VERSION, OST_DESTROY,
705                                0, &cancels, count);
706         if (rc) {
707                 ptlrpc_request_free(req);
708                 RETURN(rc);
709         }
710
711         req->rq_request_portal = OST_IO_PORTAL; /* bug 7198 */
712         req->rq_interpret_reply = osc_destroy_interpret;
713         ptlrpc_at_set_req_timeout(req);
714
715         if (oti != NULL && oa->o_valid & OBD_MD_FLCOOKIE)
716                 memcpy(obdo_logcookie(oa), oti->oti_logcookies,
717                        sizeof(*oti->oti_logcookies));
718         body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
719         LASSERT(body);
720         body->oa = *oa;
721
722         ptlrpc_request_set_replen(req);
723
724         if (!osc_can_send_destroy(cli)) {
725                 struct l_wait_info lwi = { 0 };
726
727                 /*
728                  * Wait until the number of on-going destroy RPCs drops
729                  * under max_rpc_in_flight
730                  */
731                 l_wait_event_exclusive(cli->cl_destroy_waitq,
732                                        osc_can_send_destroy(cli), &lwi);
733         }
734
735         /* Do not wait for response */
736         ptlrpcd_add_req(req);
737         RETURN(0);
738 }
739
740 static void osc_announce_cached(struct client_obd *cli, struct obdo *oa,
741                                 long writing_bytes)
742 {
743         obd_flag bits = OBD_MD_FLBLOCKS|OBD_MD_FLGRANT;
744
745         LASSERT(!(oa->o_valid & bits));
746
747         oa->o_valid |= bits;
748         client_obd_list_lock(&cli->cl_loi_list_lock);
749         oa->o_dirty = cli->cl_dirty;
750         if (cli->cl_dirty > cli->cl_dirty_max) {
751                 CERROR("dirty %lu > dirty_max %lu\n",
752                        cli->cl_dirty, cli->cl_dirty_max);
753                 oa->o_undirty = 0;
754         } else if (atomic_read(&obd_dirty_pages) > obd_max_dirty_pages) {
755                 CERROR("dirty %d > system dirty_max %d\n",
756                        atomic_read(&obd_dirty_pages), obd_max_dirty_pages);
757                 oa->o_undirty = 0;
758         } else if (cli->cl_dirty_max - cli->cl_dirty > 0x7fffffff) {
759                 CERROR("dirty %lu - dirty_max %lu too big???\n",
760                        cli->cl_dirty, cli->cl_dirty_max);
761                 oa->o_undirty = 0;
762         } else {
763                 long max_in_flight = (cli->cl_max_pages_per_rpc << CFS_PAGE_SHIFT)*
764                                 (cli->cl_max_rpcs_in_flight + 1);
765                 oa->o_undirty = max(cli->cl_dirty_max, max_in_flight);
766         }
767         oa->o_grant = cli->cl_avail_grant;
768         oa->o_dropped = cli->cl_lost_grant;
769         cli->cl_lost_grant = 0;
770         client_obd_list_unlock(&cli->cl_loi_list_lock);
771         CDEBUG(D_CACHE,"dirty: "LPU64" undirty: %u dropped %u grant: "LPU64"\n",
772                oa->o_dirty, oa->o_undirty, oa->o_dropped, oa->o_grant);
773 }
774
775 /* caller must hold loi_list_lock */
776 static void osc_consume_write_grant(struct client_obd *cli,
777                                     struct brw_page *pga)
778 {
779         atomic_inc(&obd_dirty_pages);
780         cli->cl_dirty += CFS_PAGE_SIZE;
781         cli->cl_avail_grant -= CFS_PAGE_SIZE;
782         pga->flag |= OBD_BRW_FROM_GRANT;
783         CDEBUG(D_CACHE, "using %lu grant credits for brw %p page %p\n",
784                CFS_PAGE_SIZE, pga, pga->pg);
785         LASSERT(cli->cl_avail_grant >= 0);
786 }
787
788 /* the companion to osc_consume_write_grant, called when a brw has completed.
789  * must be called with the loi lock held. */
790 static void osc_release_write_grant(struct client_obd *cli,
791                                     struct brw_page *pga, int sent)
792 {
793         int blocksize = cli->cl_import->imp_obd->obd_osfs.os_bsize ? : 4096;
794         ENTRY;
795
796         if (!(pga->flag & OBD_BRW_FROM_GRANT)) {
797                 EXIT;
798                 return;
799         }
800
801         pga->flag &= ~OBD_BRW_FROM_GRANT;
802         atomic_dec(&obd_dirty_pages);
803         cli->cl_dirty -= CFS_PAGE_SIZE;
804         if (!sent) {
805                 cli->cl_lost_grant += CFS_PAGE_SIZE;
806                 CDEBUG(D_CACHE, "lost grant: %lu avail grant: %lu dirty: %lu\n",
807                        cli->cl_lost_grant, cli->cl_avail_grant, cli->cl_dirty);
808         } else if (CFS_PAGE_SIZE != blocksize && pga->count != CFS_PAGE_SIZE) {
809                 /* For short writes we shouldn't count parts of pages that
810                  * span a whole block on the OST side, or our accounting goes
811                  * wrong.  Should match the code in filter_grant_check. */
812                 int offset = pga->off & ~CFS_PAGE_MASK;
813                 int count = pga->count + (offset & (blocksize - 1));
814                 int end = (offset + pga->count) & (blocksize - 1);
815                 if (end)
816                         count += blocksize - end;
817
818                 cli->cl_lost_grant += CFS_PAGE_SIZE - count;
819                 CDEBUG(D_CACHE, "lost %lu grant: %lu avail: %lu dirty: %lu\n",
820                        CFS_PAGE_SIZE - count, cli->cl_lost_grant,
821                        cli->cl_avail_grant, cli->cl_dirty);
822         }
823
824         EXIT;
825 }
826
827 static unsigned long rpcs_in_flight(struct client_obd *cli)
828 {
829         return cli->cl_r_in_flight + cli->cl_w_in_flight;
830 }
831
832 /* caller must hold loi_list_lock */
833 void osc_wake_cache_waiters(struct client_obd *cli)
834 {
835         struct list_head *l, *tmp;
836         struct osc_cache_waiter *ocw;
837
838         ENTRY;
839         list_for_each_safe(l, tmp, &cli->cl_cache_waiters) {
840                 /* if we can't dirty more, we must wait until some is written */
841                 if ((cli->cl_dirty + CFS_PAGE_SIZE > cli->cl_dirty_max) ||
842                    (atomic_read(&obd_dirty_pages) + 1 > obd_max_dirty_pages)) {
843                         CDEBUG(D_CACHE, "no dirty room: dirty: %ld "
844                                "osc max %ld, sys max %d\n", cli->cl_dirty,
845                                cli->cl_dirty_max, obd_max_dirty_pages);
846                         return;
847                 }
848
849                 /* if still dirty cache but no grant wait for pending RPCs that
850                  * may yet return us some grant before doing sync writes */
851                 if (cli->cl_w_in_flight && cli->cl_avail_grant < CFS_PAGE_SIZE) {
852                         CDEBUG(D_CACHE, "%u BRW writes in flight, no grant\n",
853                                cli->cl_w_in_flight);
854                         return;
855                 }
856
857                 ocw = list_entry(l, struct osc_cache_waiter, ocw_entry);
858                 list_del_init(&ocw->ocw_entry);
859                 if (cli->cl_avail_grant < CFS_PAGE_SIZE) {
860                         /* no more RPCs in flight to return grant, do sync IO */
861                         ocw->ocw_rc = -EDQUOT;
862                         CDEBUG(D_INODE, "wake oap %p for sync\n", ocw->ocw_oap);
863                 } else {
864                         osc_consume_write_grant(cli,
865                                                 &ocw->ocw_oap->oap_brw_page);
866                 }
867
868                 cfs_waitq_signal(&ocw->ocw_waitq);
869         }
870
871         EXIT;
872 }
873
874 static void osc_init_grant(struct client_obd *cli, struct obd_connect_data *ocd)
875 {
876         client_obd_list_lock(&cli->cl_loi_list_lock);
877         cli->cl_avail_grant = ocd->ocd_grant;
878         client_obd_list_unlock(&cli->cl_loi_list_lock);
879
880         CDEBUG(D_CACHE, "setting cl_avail_grant: %ld cl_lost_grant: %ld\n",
881                cli->cl_avail_grant, cli->cl_lost_grant);
882         LASSERT(cli->cl_avail_grant >= 0);
883 }
884
885 static void osc_update_grant(struct client_obd *cli, struct ost_body *body)
886 {
887         client_obd_list_lock(&cli->cl_loi_list_lock);
888         CDEBUG(D_CACHE, "got "LPU64" extra grant\n", body->oa.o_grant);
889         if (body->oa.o_valid & OBD_MD_FLGRANT)
890                 cli->cl_avail_grant += body->oa.o_grant;
891         /* waiters are woken in brw_interpret */
892         client_obd_list_unlock(&cli->cl_loi_list_lock);
893 }
894
895 /* We assume that the reason this OSC got a short read is because it read
896  * beyond the end of a stripe file; i.e. lustre is reading a sparse file
897  * via the LOV, and it _knows_ it's reading inside the file, it's just that
898  * this stripe never got written at or beyond this stripe offset yet. */
899 static void handle_short_read(int nob_read, obd_count page_count,
900                               struct brw_page **pga)
901 {
902         char *ptr;
903         int i = 0;
904
905         /* skip bytes read OK */
906         while (nob_read > 0) {
907                 LASSERT (page_count > 0);
908
909                 if (pga[i]->count > nob_read) {
910                         /* EOF inside this page */
911                         ptr = cfs_kmap(pga[i]->pg) +
912                                 (pga[i]->off & ~CFS_PAGE_MASK);
913                         memset(ptr + nob_read, 0, pga[i]->count - nob_read);
914                         cfs_kunmap(pga[i]->pg);
915                         page_count--;
916                         i++;
917                         break;
918                 }
919
920                 nob_read -= pga[i]->count;
921                 page_count--;
922                 i++;
923         }
924
925         /* zero remaining pages */
926         while (page_count-- > 0) {
927                 ptr = cfs_kmap(pga[i]->pg) + (pga[i]->off & ~CFS_PAGE_MASK);
928                 memset(ptr, 0, pga[i]->count);
929                 cfs_kunmap(pga[i]->pg);
930                 i++;
931         }
932 }
933
934 static int check_write_rcs(struct ptlrpc_request *req,
935                            int requested_nob, int niocount,
936                            obd_count page_count, struct brw_page **pga)
937 {
938         int    *remote_rcs, i;
939
940         /* return error if any niobuf was in error */
941         remote_rcs = lustre_swab_repbuf(req, REQ_REC_OFF + 1,
942                                         sizeof(*remote_rcs) * niocount, NULL);
943         if (remote_rcs == NULL) {
944                 CDEBUG(D_INFO, "Missing/short RC vector on BRW_WRITE reply\n");
945                 return(-EPROTO);
946         }
947         if (lustre_msg_swabbed(req->rq_repmsg))
948                 for (i = 0; i < niocount; i++)
949                         __swab32s(&remote_rcs[i]);
950
951         for (i = 0; i < niocount; i++) {
952                 if (remote_rcs[i] < 0)
953                         return(remote_rcs[i]);
954
955                 if (remote_rcs[i] != 0) {
956                         CDEBUG(D_INFO, "rc[%d] invalid (%d) req %p\n",
957                                 i, remote_rcs[i], req);
958                         return(-EPROTO);
959                 }
960         }
961
962         if (req->rq_bulk->bd_nob_transferred != requested_nob) {
963                 CERROR("Unexpected # bytes transferred: %d (requested %d)\n",
964                        requested_nob, req->rq_bulk->bd_nob_transferred);
965                 return(-EPROTO);
966         }
967
968         return (0);
969 }
970
971 static inline int can_merge_pages(struct brw_page *p1, struct brw_page *p2)
972 {
973         if (p1->flag != p2->flag) {
974                 unsigned mask = ~OBD_BRW_FROM_GRANT;
975
976                 /* warn if we try to combine flags that we don't know to be
977                  * safe to combine */
978                 if ((p1->flag & mask) != (p2->flag & mask))
979                         CERROR("is it ok to have flags 0x%x and 0x%x in the "
980                                "same brw?\n", p1->flag, p2->flag);
981                 return 0;
982         }
983
984         return (p1->off + p1->count == p2->off);
985 }
986
987 static obd_count osc_checksum_bulk(int nob, obd_count pg_count,
988                                    struct brw_page **pga, int opc,
989                                    cksum_type_t cksum_type)
990 {
991         __u32 cksum;
992         int i = 0;
993
994         LASSERT (pg_count > 0);
995         cksum = init_checksum(cksum_type);
996         while (nob > 0 && pg_count > 0) {
997                 unsigned char *ptr = cfs_kmap(pga[i]->pg);
998                 int off = pga[i]->off & ~CFS_PAGE_MASK;
999                 int count = pga[i]->count > nob ? nob : pga[i]->count;
1000
1001                 /* corrupt the data before we compute the checksum, to
1002                  * simulate an OST->client data error */
1003                 if (i == 0 && opc == OST_READ &&
1004                     OBD_FAIL_CHECK(OBD_FAIL_OSC_CHECKSUM_RECEIVE))
1005                         memcpy(ptr + off, "bad1", min(4, nob));
1006                 cksum = compute_checksum(cksum, ptr + off, count, cksum_type);
1007                 cfs_kunmap(pga[i]->pg);
1008                 LL_CDEBUG_PAGE(D_PAGE, pga[i]->pg, "off %d checksum %x\n",
1009                                off, cksum);
1010
1011                 nob -= pga[i]->count;
1012                 pg_count--;
1013                 i++;
1014         }
1015         /* For sending we only compute the wrong checksum instead
1016          * of corrupting the data so it is still correct on a redo */
1017         if (opc == OST_WRITE && OBD_FAIL_CHECK(OBD_FAIL_OSC_CHECKSUM_SEND))
1018                 cksum++;
1019
1020         return cksum;
1021 }
1022
1023 static int osc_brw_prep_request(int cmd, struct client_obd *cli,struct obdo *oa,
1024                                 struct lov_stripe_md *lsm, obd_count page_count,
1025                                 struct brw_page **pga,
1026                                 struct ptlrpc_request **reqp,
1027                                 struct obd_capa *ocapa)
1028 {
1029         struct ptlrpc_request   *req;
1030         struct ptlrpc_bulk_desc *desc;
1031         struct ost_body         *body;
1032         struct obd_ioobj        *ioobj;
1033         struct niobuf_remote    *niobuf;
1034         int niocount, i, requested_nob, opc, rc;
1035         struct osc_brw_async_args *aa;
1036         struct req_capsule      *pill;
1037         struct brw_page *pg_prev;
1038
1039         ENTRY;
1040         if (OBD_FAIL_CHECK(OBD_FAIL_OSC_BRW_PREP_REQ))
1041                 RETURN(-ENOMEM); /* Recoverable */
1042         if (OBD_FAIL_CHECK(OBD_FAIL_OSC_BRW_PREP_REQ2))
1043                 RETURN(-EINVAL); /* Fatal */
1044
1045         if ((cmd & OBD_BRW_WRITE) != 0) {
1046                 opc = OST_WRITE;
1047                 req = ptlrpc_request_alloc_pool(cli->cl_import,
1048                                                 cli->cl_import->imp_rq_pool,
1049                                                 &RQF_OST_BRW);
1050         } else {
1051                 opc = OST_READ;
1052                 req = ptlrpc_request_alloc(cli->cl_import, &RQF_OST_BRW);
1053         }
1054
1055         if (req == NULL)
1056                 RETURN(-ENOMEM);
1057
1058         for (niocount = i = 1; i < page_count; i++) {
1059                 if (!can_merge_pages(pga[i - 1], pga[i]))
1060                         niocount++;
1061         }
1062
1063         pill = &req->rq_pill;
1064         req_capsule_set_size(pill, &RMF_NIOBUF_REMOTE, RCL_CLIENT,
1065                              niocount * sizeof(*niobuf));
1066         osc_set_capa_size(req, &RMF_CAPA1, ocapa);
1067
1068         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, opc);
1069         if (rc) {
1070                 ptlrpc_request_free(req);
1071                 RETURN(rc);
1072         }
1073         req->rq_request_portal = OST_IO_PORTAL; /* bug 7198 */
1074         ptlrpc_at_set_req_timeout(req);
1075
1076         if (opc == OST_WRITE)
1077                 desc = ptlrpc_prep_bulk_imp(req, page_count,
1078                                             BULK_GET_SOURCE, OST_BULK_PORTAL);
1079         else
1080                 desc = ptlrpc_prep_bulk_imp(req, page_count,
1081                                             BULK_PUT_SINK, OST_BULK_PORTAL);
1082
1083         if (desc == NULL)
1084                 GOTO(out, rc = -ENOMEM);
1085         /* NB request now owns desc and will free it when it gets freed */
1086
1087         body = req_capsule_client_get(pill, &RMF_OST_BODY);
1088         ioobj = req_capsule_client_get(pill, &RMF_OBD_IOOBJ);
1089         niobuf = req_capsule_client_get(pill, &RMF_NIOBUF_REMOTE);
1090         LASSERT(body && ioobj && niobuf);
1091
1092         body->oa = *oa;
1093
1094         obdo_to_ioobj(oa, ioobj);
1095         ioobj->ioo_bufcnt = niocount;
1096         osc_pack_capa(req, body, ocapa);
1097         LASSERT (page_count > 0);
1098         pg_prev = pga[0];
1099         for (requested_nob = i = 0; i < page_count; i++, niobuf++) {
1100                 struct brw_page *pg = pga[i];
1101
1102                 LASSERT(pg->count > 0);
1103                 LASSERTF((pg->off & ~CFS_PAGE_MASK) + pg->count <= CFS_PAGE_SIZE,
1104                          "i: %d pg: %p off: "LPU64", count: %u\n", i, pg,
1105                          pg->off, pg->count);
1106 #ifdef __linux__
1107                 LASSERTF(i == 0 || pg->off > pg_prev->off,
1108                          "i %d p_c %u pg %p [pri %lu ind %lu] off "LPU64
1109                          " prev_pg %p [pri %lu ind %lu] off "LPU64"\n",
1110                          i, page_count,
1111                          pg->pg, page_private(pg->pg), pg->pg->index, pg->off,
1112                          pg_prev->pg, page_private(pg_prev->pg),
1113                          pg_prev->pg->index, pg_prev->off);
1114 #else
1115                 LASSERTF(i == 0 || pg->off > pg_prev->off,
1116                          "i %d p_c %u\n", i, page_count);
1117 #endif
1118                 LASSERT((pga[0]->flag & OBD_BRW_SRVLOCK) ==
1119                         (pg->flag & OBD_BRW_SRVLOCK));
1120
1121                 ptlrpc_prep_bulk_page(desc, pg->pg, pg->off & ~CFS_PAGE_MASK,
1122                                       pg->count);
1123                 requested_nob += pg->count;
1124
1125                 if (i > 0 && can_merge_pages(pg_prev, pg)) {
1126                         niobuf--;
1127                         niobuf->len += pg->count;
1128                 } else {
1129                         niobuf->offset = pg->off;
1130                         niobuf->len    = pg->count;
1131                         niobuf->flags  = pg->flag;
1132                 }
1133                 pg_prev = pg;
1134         }
1135
1136         LASSERTF((void *)(niobuf - niocount) ==
1137                 lustre_msg_buf(req->rq_reqmsg, REQ_REC_OFF + 2,
1138                                niocount * sizeof(*niobuf)),
1139                 "want %p - real %p\n", lustre_msg_buf(req->rq_reqmsg,
1140                 REQ_REC_OFF + 2, niocount * sizeof(*niobuf)),
1141                 (void *)(niobuf - niocount));
1142
1143         osc_announce_cached(cli, &body->oa, opc == OST_WRITE ? requested_nob:0);
1144
1145         /* size[REQ_REC_OFF] still sizeof (*body) */
1146         if (opc == OST_WRITE) {
1147                 if (unlikely(cli->cl_checksum) &&
1148                     req->rq_flvr.sf_bulk_hash == BULK_HASH_ALG_NULL) {
1149                         /* store cl_cksum_type in a local variable since
1150                          * it can be changed via lprocfs */
1151                         cksum_type_t cksum_type = cli->cl_cksum_type;
1152
1153                         if ((body->oa.o_valid & OBD_MD_FLFLAGS) == 0)
1154                                 oa->o_flags = body->oa.o_flags = 0;
1155                         body->oa.o_flags |= cksum_type_pack(cksum_type);
1156                         body->oa.o_valid |= OBD_MD_FLCKSUM | OBD_MD_FLFLAGS;
1157                         body->oa.o_cksum = osc_checksum_bulk(requested_nob,
1158                                                              page_count, pga,
1159                                                              OST_WRITE,
1160                                                              cksum_type);
1161                         CDEBUG(D_PAGE, "checksum at write origin: %x\n",
1162                                body->oa.o_cksum);
1163                         /* save this in 'oa', too, for later checking */
1164                         oa->o_valid |= OBD_MD_FLCKSUM | OBD_MD_FLFLAGS;
1165                         oa->o_flags |= cksum_type_pack(cksum_type);
1166                 } else {
1167                         /* clear out the checksum flag, in case this is a
1168                          * resend but cl_checksum is no longer set. b=11238 */
1169                         oa->o_valid &= ~OBD_MD_FLCKSUM;
1170                 }
1171                 oa->o_cksum = body->oa.o_cksum;
1172                 /* 1 RC per niobuf */
1173                 req_capsule_set_size(pill, &RMF_NIOBUF_REMOTE, RCL_SERVER,
1174                                      sizeof(__u32) * niocount);
1175         } else {
1176                 if (unlikely(cli->cl_checksum) &&
1177                     req->rq_flvr.sf_bulk_hash == BULK_HASH_ALG_NULL) {
1178                         if ((body->oa.o_valid & OBD_MD_FLFLAGS) == 0)
1179                                 body->oa.o_flags = 0;
1180                         body->oa.o_flags |= cksum_type_pack(cli->cl_cksum_type);
1181                         body->oa.o_valid |= OBD_MD_FLCKSUM | OBD_MD_FLFLAGS;
1182                 }
1183                 req_capsule_set_size(pill, &RMF_NIOBUF_REMOTE, RCL_SERVER, 0);
1184                 /* 1 RC for the whole I/O */
1185         }
1186         ptlrpc_request_set_replen(req);
1187
1188         CLASSERT(sizeof(*aa) <= sizeof(req->rq_async_args));
1189         aa = ptlrpc_req_async_args(req);
1190         aa->aa_oa = oa;
1191         aa->aa_requested_nob = requested_nob;
1192         aa->aa_nio_count = niocount;
1193         aa->aa_page_count = page_count;
1194         aa->aa_resends = 0;
1195         aa->aa_ppga = pga;
1196         aa->aa_cli = cli;
1197         CFS_INIT_LIST_HEAD(&aa->aa_oaps);
1198
1199         *reqp = req;
1200         RETURN(0);
1201
1202  out:
1203         ptlrpc_req_finished(req);
1204         RETURN(rc);
1205 }
1206
1207 static int check_write_checksum(struct obdo *oa, const lnet_process_id_t *peer,
1208                                 __u32 client_cksum, __u32 server_cksum, int nob,
1209                                 obd_count page_count, struct brw_page **pga,
1210                                 cksum_type_t client_cksum_type)
1211 {
1212         __u32 new_cksum;
1213         char *msg;
1214         cksum_type_t cksum_type;
1215
1216         if (server_cksum == client_cksum) {
1217                 CDEBUG(D_PAGE, "checksum %x confirmed\n", client_cksum);
1218                 return 0;
1219         }
1220
1221         if (oa->o_valid & OBD_MD_FLFLAGS)
1222                 cksum_type = cksum_type_unpack(oa->o_flags);
1223         else
1224                 cksum_type = OBD_CKSUM_CRC32;
1225
1226         new_cksum = osc_checksum_bulk(nob, page_count, pga, OST_WRITE,
1227                                       cksum_type);
1228
1229         if (cksum_type != client_cksum_type)
1230                 msg = "the server did not use the checksum type specified in "
1231                       "the original request - likely a protocol problem";
1232         else if (new_cksum == server_cksum)
1233                 msg = "changed on the client after we checksummed it - "
1234                       "likely false positive due to mmap IO (bug 11742)";
1235         else if (new_cksum == client_cksum)
1236                 msg = "changed in transit before arrival at OST";
1237         else
1238                 msg = "changed in transit AND doesn't match the original - "
1239                       "likely false positive due to mmap IO (bug 11742)";
1240
1241         LCONSOLE_ERROR_MSG(0x132, "BAD WRITE CHECKSUM: %s: from %s inum "
1242                            LPU64"/"LPU64" object "LPU64"/"LPU64" extent "
1243                            "["LPU64"-"LPU64"]\n",
1244                            msg, libcfs_nid2str(peer->nid),
1245                            oa->o_valid & OBD_MD_FLFID ? oa->o_fid : (__u64)0,
1246                            oa->o_valid & OBD_MD_FLFID ? oa->o_generation :
1247                                                         (__u64)0,
1248                            oa->o_id,
1249                            oa->o_valid & OBD_MD_FLGROUP ? oa->o_gr : (__u64)0,
1250                            pga[0]->off,
1251                            pga[page_count-1]->off + pga[page_count-1]->count - 1);
1252         CERROR("original client csum %x (type %x), server csum %x (type %x), "
1253                "client csum now %x\n", client_cksum, client_cksum_type,
1254                server_cksum, cksum_type, new_cksum);
1255         return 1;
1256 }
1257
1258 /* Note rc enters this function as number of bytes transferred */
1259 static int osc_brw_fini_request(struct ptlrpc_request *req, int rc)
1260 {
1261         struct osc_brw_async_args *aa = (void *)&req->rq_async_args;
1262         const lnet_process_id_t *peer =
1263                         &req->rq_import->imp_connection->c_peer;
1264         struct client_obd *cli = aa->aa_cli;
1265         struct ost_body *body;
1266         __u32 client_cksum = 0;
1267         ENTRY;
1268
1269         if (rc < 0 && rc != -EDQUOT)
1270                 RETURN(rc);
1271
1272         LASSERTF(req->rq_repmsg != NULL, "rc = %d\n", rc);
1273         body = lustre_swab_repbuf(req, REPLY_REC_OFF, sizeof(*body),
1274                                   lustre_swab_ost_body);
1275         if (body == NULL) {
1276                 CDEBUG(D_INFO, "Can't unpack body\n");
1277                 RETURN(-EPROTO);
1278         }
1279
1280         /* set/clear over quota flag for a uid/gid */
1281         if (lustre_msg_get_opc(req->rq_reqmsg) == OST_WRITE &&
1282             body->oa.o_valid & (OBD_MD_FLUSRQUOTA | OBD_MD_FLGRPQUOTA))
1283                 lquota_setdq(quota_interface, cli, body->oa.o_uid,
1284                              body->oa.o_gid, body->oa.o_valid,
1285                              body->oa.o_flags);
1286
1287         if (rc < 0)
1288                 RETURN(rc);
1289
1290         if (aa->aa_oa->o_valid & OBD_MD_FLCKSUM)
1291                 client_cksum = aa->aa_oa->o_cksum; /* save for later */
1292
1293         osc_update_grant(cli, body);
1294
1295         if (lustre_msg_get_opc(req->rq_reqmsg) == OST_WRITE) {
1296                 if (rc > 0) {
1297                         CERROR("Unexpected +ve rc %d\n", rc);
1298                         RETURN(-EPROTO);
1299                 }
1300                 LASSERT(req->rq_bulk->bd_nob == aa->aa_requested_nob);
1301
1302                 if ((aa->aa_oa->o_valid & OBD_MD_FLCKSUM) && client_cksum &&
1303                     check_write_checksum(&body->oa, peer, client_cksum,
1304                                          body->oa.o_cksum, aa->aa_requested_nob,
1305                                          aa->aa_page_count, aa->aa_ppga,
1306                                          cksum_type_unpack(aa->aa_oa->o_flags)))
1307                         RETURN(-EAGAIN);
1308
1309                 if (sptlrpc_cli_unwrap_bulk_write(req, req->rq_bulk))
1310                         RETURN(-EAGAIN);
1311
1312                 rc = check_write_rcs(req, aa->aa_requested_nob,aa->aa_nio_count,
1313                                      aa->aa_page_count, aa->aa_ppga);
1314                 GOTO(out, rc);
1315         }
1316
1317         /* The rest of this function executes only for OST_READs */
1318         if (rc > aa->aa_requested_nob) {
1319                 CERROR("Unexpected rc %d (%d requested)\n", rc,
1320                        aa->aa_requested_nob);
1321                 RETURN(-EPROTO);
1322         }
1323
1324         if (rc != req->rq_bulk->bd_nob_transferred) {
1325                 CERROR ("Unexpected rc %d (%d transferred)\n",
1326                         rc, req->rq_bulk->bd_nob_transferred);
1327                 return (-EPROTO);
1328         }
1329
1330         if (rc < aa->aa_requested_nob)
1331                 handle_short_read(rc, aa->aa_page_count, aa->aa_ppga);
1332
1333         if (sptlrpc_cli_unwrap_bulk_read(req, rc, aa->aa_page_count,
1334                                          aa->aa_ppga))
1335                 GOTO(out, rc = -EAGAIN);
1336
1337         if (body->oa.o_valid & OBD_MD_FLCKSUM) {
1338                 static int cksum_counter;
1339                 __u32      server_cksum = body->oa.o_cksum;
1340                 char      *via;
1341                 char      *router;
1342                 cksum_type_t cksum_type;
1343
1344                 if (body->oa.o_valid & OBD_MD_FLFLAGS)
1345                         cksum_type = cksum_type_unpack(body->oa.o_flags);
1346                 else
1347                         cksum_type = OBD_CKSUM_CRC32;
1348                 client_cksum = osc_checksum_bulk(rc, aa->aa_page_count,
1349                                                  aa->aa_ppga, OST_READ,
1350                                                  cksum_type);
1351
1352                 if (peer->nid == req->rq_bulk->bd_sender) {
1353                         via = router = "";
1354                 } else {
1355                         via = " via ";
1356                         router = libcfs_nid2str(req->rq_bulk->bd_sender);
1357                 }
1358
1359                 if (server_cksum == ~0 && rc > 0) {
1360                         CERROR("Protocol error: server %s set the 'checksum' "
1361                                "bit, but didn't send a checksum.  Not fatal, "
1362                                "but please notify on http://bugzilla.lustre.org/\n",
1363                                libcfs_nid2str(peer->nid));
1364                 } else if (server_cksum != client_cksum) {
1365                         LCONSOLE_ERROR_MSG(0x133, "%s: BAD READ CHECKSUM: from "
1366                                            "%s%s%s inum "LPU64"/"LPU64" object "
1367                                            LPU64"/"LPU64" extent "
1368                                            "["LPU64"-"LPU64"]\n",
1369                                            req->rq_import->imp_obd->obd_name,
1370                                            libcfs_nid2str(peer->nid),
1371                                            via, router,
1372                                            body->oa.o_valid & OBD_MD_FLFID ?
1373                                                 body->oa.o_fid : (__u64)0,
1374                                            body->oa.o_valid & OBD_MD_FLFID ?
1375                                                 body->oa.o_generation :(__u64)0,
1376                                            body->oa.o_id,
1377                                            body->oa.o_valid & OBD_MD_FLGROUP ?
1378                                                 body->oa.o_gr : (__u64)0,
1379                                            aa->aa_ppga[0]->off,
1380                                            aa->aa_ppga[aa->aa_page_count-1]->off +
1381                                            aa->aa_ppga[aa->aa_page_count-1]->count -
1382                                                                         1);
1383                         CERROR("client %x, server %x, cksum_type %x\n",
1384                                client_cksum, server_cksum, cksum_type);
1385                         cksum_counter = 0;
1386                         aa->aa_oa->o_cksum = client_cksum;
1387                         rc = -EAGAIN;
1388                 } else {
1389                         cksum_counter++;
1390                         CDEBUG(D_PAGE, "checksum %x confirmed\n", client_cksum);
1391                         rc = 0;
1392                 }
1393         } else if (unlikely(client_cksum)) {
1394                 static int cksum_missed;
1395
1396                 cksum_missed++;
1397                 if ((cksum_missed & (-cksum_missed)) == cksum_missed)
1398                         CERROR("Checksum %u requested from %s but not sent\n",
1399                                cksum_missed, libcfs_nid2str(peer->nid));
1400         } else {
1401                 rc = 0;
1402         }
1403 out:
1404         if (rc >= 0)
1405                 *aa->aa_oa = body->oa;
1406
1407         RETURN(rc);
1408 }
1409
1410 static int osc_brw_internal(int cmd, struct obd_export *exp, struct obdo *oa,
1411                             struct lov_stripe_md *lsm,
1412                             obd_count page_count, struct brw_page **pga,
1413                             struct obd_capa *ocapa)
1414 {
1415         struct ptlrpc_request *req;
1416         int                    rc;
1417         cfs_waitq_t            waitq;
1418         int                    resends = 0;
1419         struct l_wait_info     lwi;
1420
1421         ENTRY;
1422
1423         cfs_waitq_init(&waitq);
1424
1425 restart_bulk:
1426         rc = osc_brw_prep_request(cmd, &exp->exp_obd->u.cli, oa, lsm,
1427                                   page_count, pga, &req, ocapa);
1428         if (rc != 0)
1429                 return (rc);
1430
1431         rc = ptlrpc_queue_wait(req);
1432
1433         if (rc == -ETIMEDOUT && req->rq_resend) {
1434                 DEBUG_REQ(D_HA, req,  "BULK TIMEOUT");
1435                 ptlrpc_req_finished(req);
1436                 goto restart_bulk;
1437         }
1438
1439         rc = osc_brw_fini_request(req, rc);
1440
1441         ptlrpc_req_finished(req);
1442         if (osc_recoverable_error(rc)) {
1443                 resends++;
1444                 if (!osc_should_resend(resends, &exp->exp_obd->u.cli)) {
1445                         CERROR("too many resend retries, returning error\n");
1446                         RETURN(-EIO);
1447                 }
1448
1449                 lwi = LWI_TIMEOUT_INTR(cfs_time_seconds(resends), NULL, NULL, NULL);
1450                 l_wait_event(waitq, 0, &lwi);
1451
1452                 goto restart_bulk;
1453         }
1454
1455         RETURN (rc);
1456 }
1457
1458 int osc_brw_redo_request(struct ptlrpc_request *request,
1459                          struct osc_brw_async_args *aa)
1460 {
1461         struct ptlrpc_request *new_req;
1462         struct ptlrpc_request_set *set = request->rq_set;
1463         struct osc_brw_async_args *new_aa;
1464         struct osc_async_page *oap;
1465         int rc = 0;
1466         ENTRY;
1467
1468         if (!osc_should_resend(aa->aa_resends, aa->aa_cli)) {
1469                 CERROR("too many resend retries, returning error\n");
1470                 RETURN(-EIO);
1471         }
1472
1473         DEBUG_REQ(D_ERROR, request, "redo for recoverable error");
1474 /*
1475         body = lustre_msg_buf(request->rq_reqmsg, REQ_REC_OFF, sizeof(*body));
1476         if (body->oa.o_valid & OBD_MD_FLOSSCAPA)
1477                 ocapa = lustre_unpack_capa(request->rq_reqmsg,
1478                                            REQ_REC_OFF + 3);
1479 */
1480         rc = osc_brw_prep_request(lustre_msg_get_opc(request->rq_reqmsg) ==
1481                                         OST_WRITE ? OBD_BRW_WRITE :OBD_BRW_READ,
1482                                   aa->aa_cli, aa->aa_oa,
1483                                   NULL /* lsm unused by osc currently */,
1484                                   aa->aa_page_count, aa->aa_ppga,
1485                                   &new_req, NULL /* ocapa */);
1486         if (rc)
1487                 RETURN(rc);
1488
1489         client_obd_list_lock(&aa->aa_cli->cl_loi_list_lock);
1490
1491         list_for_each_entry(oap, &aa->aa_oaps, oap_rpc_item) {
1492                 if (oap->oap_request != NULL) {
1493                         LASSERTF(request == oap->oap_request,
1494                                  "request %p != oap_request %p\n",
1495                                  request, oap->oap_request);
1496                         if (oap->oap_interrupted) {
1497                                 client_obd_list_unlock(&aa->aa_cli->cl_loi_list_lock);
1498                                 ptlrpc_req_finished(new_req);
1499                                 RETURN(-EINTR);
1500                         }
1501                 }
1502         }
1503         /* New request takes over pga and oaps from old request.
1504          * Note that copying a list_head doesn't work, need to move it... */
1505         aa->aa_resends++;
1506         new_req->rq_interpret_reply = request->rq_interpret_reply;
1507         new_req->rq_async_args = request->rq_async_args;
1508         new_req->rq_sent = cfs_time_current_sec() + aa->aa_resends;
1509
1510         new_aa = ptlrpc_req_async_args(new_req);
1511
1512         CFS_INIT_LIST_HEAD(&new_aa->aa_oaps);
1513         list_splice(&aa->aa_oaps, &new_aa->aa_oaps);
1514         CFS_INIT_LIST_HEAD(&aa->aa_oaps);
1515
1516         list_for_each_entry(oap, &new_aa->aa_oaps, oap_rpc_item) {
1517                 if (oap->oap_request) {
1518                         ptlrpc_req_finished(oap->oap_request);
1519                         oap->oap_request = ptlrpc_request_addref(new_req);
1520                 }
1521         }
1522
1523         /* use ptlrpc_set_add_req is safe because interpret functions work
1524          * in check_set context. only one way exist with access to request
1525          * from different thread got -EINTR - this way protected with
1526          * cl_loi_list_lock */
1527         ptlrpc_set_add_req(set, new_req);
1528
1529         client_obd_list_unlock(&aa->aa_cli->cl_loi_list_lock);
1530
1531         DEBUG_REQ(D_INFO, new_req, "new request");
1532         RETURN(0);
1533 }
1534
1535 static int async_internal(int cmd, struct obd_export *exp, struct obdo *oa,
1536                           struct lov_stripe_md *lsm, obd_count page_count,
1537                           struct brw_page **pga, struct ptlrpc_request_set *set,
1538                           struct obd_capa *ocapa)
1539 {
1540         struct ptlrpc_request     *req;
1541         struct client_obd         *cli = &exp->exp_obd->u.cli;
1542         int                        rc, i;
1543         struct osc_brw_async_args *aa;
1544         ENTRY;
1545
1546         /* Consume write credits even if doing a sync write -
1547          * otherwise we may run out of space on OST due to grant. */
1548         if (cmd == OBD_BRW_WRITE) {
1549                 spin_lock(&cli->cl_loi_list_lock);
1550                 for (i = 0; i < page_count; i++) {
1551                         if (cli->cl_avail_grant >= CFS_PAGE_SIZE)
1552                                 osc_consume_write_grant(cli, pga[i]);
1553                 }
1554                 spin_unlock(&cli->cl_loi_list_lock);
1555         }
1556
1557         rc = osc_brw_prep_request(cmd, cli, oa, lsm, page_count, pga,
1558                                   &req, ocapa);
1559
1560         aa = ptlrpc_req_async_args(req);
1561         if (cmd == OBD_BRW_READ) {
1562                 lprocfs_oh_tally_log2(&cli->cl_read_page_hist, page_count);
1563                 lprocfs_oh_tally(&cli->cl_read_rpc_hist, cli->cl_r_in_flight);
1564         } else {
1565                 lprocfs_oh_tally_log2(&cli->cl_write_page_hist, page_count);
1566                 lprocfs_oh_tally(&cli->cl_write_rpc_hist,
1567                                  cli->cl_w_in_flight);
1568         }
1569         ptlrpc_lprocfs_brw(req, aa->aa_requested_nob);
1570
1571         LASSERT(list_empty(&aa->aa_oaps));
1572         if (rc == 0) {
1573                 req->rq_interpret_reply = brw_interpret;
1574                 ptlrpc_set_add_req(set, req);
1575                 client_obd_list_lock(&cli->cl_loi_list_lock);
1576                 if (cmd == OBD_BRW_READ)
1577                         cli->cl_r_in_flight++;
1578                 else
1579                         cli->cl_w_in_flight++;
1580                 client_obd_list_unlock(&cli->cl_loi_list_lock);
1581                 OBD_FAIL_TIMEOUT(OBD_FAIL_OSC_DIO_PAUSE, 3);
1582         } else if (cmd == OBD_BRW_WRITE) {
1583                 client_obd_list_lock(&cli->cl_loi_list_lock);
1584                 for (i = 0; i < page_count; i++)
1585                         osc_release_write_grant(cli, pga[i], 0);
1586                 osc_wake_cache_waiters(cli);
1587                 client_obd_list_unlock(&cli->cl_loi_list_lock);
1588         }
1589         RETURN (rc);
1590 }
1591
1592 /*
1593  * ugh, we want disk allocation on the target to happen in offset order.  we'll
1594  * follow sedgewicks advice and stick to the dead simple shellsort -- it'll do
1595  * fine for our small page arrays and doesn't require allocation.  its an
1596  * insertion sort that swaps elements that are strides apart, shrinking the
1597  * stride down until its '1' and the array is sorted.
1598  */
1599 static void sort_brw_pages(struct brw_page **array, int num)
1600 {
1601         int stride, i, j;
1602         struct brw_page *tmp;
1603
1604         if (num == 1)
1605                 return;
1606         for (stride = 1; stride < num ; stride = (stride * 3) + 1)
1607                 ;
1608
1609         do {
1610                 stride /= 3;
1611                 for (i = stride ; i < num ; i++) {
1612                         tmp = array[i];
1613                         j = i;
1614                         while (j >= stride && array[j - stride]->off > tmp->off) {
1615                                 array[j] = array[j - stride];
1616                                 j -= stride;
1617                         }
1618                         array[j] = tmp;
1619                 }
1620         } while (stride > 1);
1621 }
1622
1623 static obd_count max_unfragmented_pages(struct brw_page **pg, obd_count pages)
1624 {
1625         int count = 1;
1626         int offset;
1627         int i = 0;
1628
1629         LASSERT (pages > 0);
1630         offset = pg[i]->off & ~CFS_PAGE_MASK;
1631
1632         for (;;) {
1633                 pages--;
1634                 if (pages == 0)         /* that's all */
1635                         return count;
1636
1637                 if (offset + pg[i]->count < CFS_PAGE_SIZE)
1638                         return count;   /* doesn't end on page boundary */
1639
1640                 i++;
1641                 offset = pg[i]->off & ~CFS_PAGE_MASK;
1642                 if (offset != 0)        /* doesn't start on page boundary */
1643                         return count;
1644
1645                 count++;
1646         }
1647 }
1648
1649 static struct brw_page **osc_build_ppga(struct brw_page *pga, obd_count count)
1650 {
1651         struct brw_page **ppga;
1652         int i;
1653
1654         OBD_ALLOC(ppga, sizeof(*ppga) * count);
1655         if (ppga == NULL)
1656                 return NULL;
1657
1658         for (i = 0; i < count; i++)
1659                 ppga[i] = pga + i;
1660         return ppga;
1661 }
1662
1663 static void osc_release_ppga(struct brw_page **ppga, obd_count count)
1664 {
1665         LASSERT(ppga != NULL);
1666         OBD_FREE(ppga, sizeof(*ppga) * count);
1667 }
1668
1669 static int osc_brw(int cmd, struct obd_export *exp, struct obd_info *oinfo,
1670                    obd_count page_count, struct brw_page *pga,
1671                    struct obd_trans_info *oti)
1672 {
1673         struct obdo *saved_oa = NULL;
1674         struct brw_page **ppga, **orig;
1675         struct obd_import *imp = class_exp2cliimp(exp);
1676         struct client_obd *cli = &imp->imp_obd->u.cli;
1677         int rc, page_count_orig;
1678         ENTRY;
1679
1680         if (cmd & OBD_BRW_CHECK) {
1681                 /* The caller just wants to know if there's a chance that this
1682                  * I/O can succeed */
1683
1684                 if (imp == NULL || imp->imp_invalid)
1685                         RETURN(-EIO);
1686                 RETURN(0);
1687         }
1688
1689         /* test_brw with a failed create can trip this, maybe others. */
1690         LASSERT(cli->cl_max_pages_per_rpc);
1691
1692         rc = 0;
1693
1694         orig = ppga = osc_build_ppga(pga, page_count);
1695         if (ppga == NULL)
1696                 RETURN(-ENOMEM);
1697         page_count_orig = page_count;
1698
1699         sort_brw_pages(ppga, page_count);
1700         while (page_count) {
1701                 obd_count pages_per_brw;
1702
1703                 if (page_count > cli->cl_max_pages_per_rpc)
1704                         pages_per_brw = cli->cl_max_pages_per_rpc;
1705                 else
1706                         pages_per_brw = page_count;
1707
1708                 pages_per_brw = max_unfragmented_pages(ppga, pages_per_brw);
1709
1710                 if (saved_oa != NULL) {
1711                         /* restore previously saved oa */
1712                         *oinfo->oi_oa = *saved_oa;
1713                 } else if (page_count > pages_per_brw) {
1714                         /* save a copy of oa (brw will clobber it) */
1715                         OBDO_ALLOC(saved_oa);
1716                         if (saved_oa == NULL)
1717                                 GOTO(out, rc = -ENOMEM);
1718                         *saved_oa = *oinfo->oi_oa;
1719                 }
1720
1721                 rc = osc_brw_internal(cmd, exp, oinfo->oi_oa, oinfo->oi_md,
1722                                       pages_per_brw, ppga, oinfo->oi_capa);
1723
1724                 if (rc != 0)
1725                         break;
1726
1727                 page_count -= pages_per_brw;
1728                 ppga += pages_per_brw;
1729         }
1730
1731 out:
1732         osc_release_ppga(orig, page_count_orig);
1733
1734         if (saved_oa != NULL)
1735                 OBDO_FREE(saved_oa);
1736
1737         RETURN(rc);
1738 }
1739
1740 static int osc_brw_async(int cmd, struct obd_export *exp,
1741                          struct obd_info *oinfo, obd_count page_count,
1742                          struct brw_page *pga, struct obd_trans_info *oti,
1743                          struct ptlrpc_request_set *set)
1744 {
1745         struct brw_page **ppga, **orig;
1746         struct client_obd *cli = &exp->exp_obd->u.cli;
1747         int page_count_orig;
1748         int rc = 0;
1749         ENTRY;
1750
1751         if (cmd & OBD_BRW_CHECK) {
1752                 struct obd_import *imp = class_exp2cliimp(exp);
1753                 /* The caller just wants to know if there's a chance that this
1754                  * I/O can succeed */
1755
1756                 if (imp == NULL || imp->imp_invalid)
1757                         RETURN(-EIO);
1758                 RETURN(0);
1759         }
1760
1761         orig = ppga = osc_build_ppga(pga, page_count);
1762         if (ppga == NULL)
1763                 RETURN(-ENOMEM);
1764         page_count_orig = page_count;
1765
1766         sort_brw_pages(ppga, page_count);
1767         while (page_count) {
1768                 struct brw_page **copy;
1769                 obd_count pages_per_brw;
1770
1771                 pages_per_brw = min_t(obd_count, page_count,
1772                                       cli->cl_max_pages_per_rpc);
1773
1774                 pages_per_brw = max_unfragmented_pages(ppga, pages_per_brw);
1775
1776                 /* use ppga only if single RPC is going to fly */
1777                 if (pages_per_brw != page_count_orig || ppga != orig) {
1778                         OBD_ALLOC(copy, sizeof(*copy) * pages_per_brw);
1779                         if (copy == NULL)
1780                                 GOTO(out, rc = -ENOMEM);
1781                         memcpy(copy, ppga, sizeof(*copy) * pages_per_brw);
1782                 } else
1783                         copy = ppga;
1784
1785                 rc = async_internal(cmd, exp, oinfo->oi_oa, oinfo->oi_md,
1786                                     pages_per_brw, copy, set, oinfo->oi_capa);
1787
1788                 if (rc != 0) {
1789                         if (copy != ppga)
1790                                 OBD_FREE(copy, sizeof(*copy) * pages_per_brw);
1791                         break;
1792                 }
1793                 if (copy == orig) {
1794                         /* we passed it to async_internal() which is
1795                          * now responsible for releasing memory */
1796                         orig = NULL;
1797                 }
1798
1799                 page_count -= pages_per_brw;
1800                 ppga += pages_per_brw;
1801         }
1802 out:
1803         if (orig)
1804                 osc_release_ppga(orig, page_count_orig);
1805         RETURN(rc);
1806 }
1807
1808 static void osc_check_rpcs(struct client_obd *cli);
1809
1810 /* The companion to osc_enter_cache(), called when @oap is no longer part of
1811  * the dirty accounting.  Writeback completes or truncate happens before
1812  * writing starts.  Must be called with the loi lock held. */
1813 static void osc_exit_cache(struct client_obd *cli, struct osc_async_page *oap,
1814                            int sent)
1815 {
1816         osc_release_write_grant(cli, &oap->oap_brw_page, sent);
1817 }
1818
1819
1820 /* This maintains the lists of pending pages to read/write for a given object
1821  * (lop).  This is used by osc_check_rpcs->osc_next_loi() and loi_list_maint()
1822  * to quickly find objects that are ready to send an RPC. */
1823 static int lop_makes_rpc(struct client_obd *cli, struct loi_oap_pages *lop,
1824                          int cmd)
1825 {
1826         int optimal;
1827         ENTRY;
1828
1829         if (lop->lop_num_pending == 0)
1830                 RETURN(0);
1831
1832         /* if we have an invalid import we want to drain the queued pages
1833          * by forcing them through rpcs that immediately fail and complete
1834          * the pages.  recovery relies on this to empty the queued pages
1835          * before canceling the locks and evicting down the llite pages */
1836         if (cli->cl_import == NULL || cli->cl_import->imp_invalid)
1837                 RETURN(1);
1838
1839         /* stream rpcs in queue order as long as as there is an urgent page
1840          * queued.  this is our cheap solution for good batching in the case
1841          * where writepage marks some random page in the middle of the file
1842          * as urgent because of, say, memory pressure */
1843         if (!list_empty(&lop->lop_urgent)) {
1844                 CDEBUG(D_CACHE, "urgent request forcing RPC\n");
1845                 RETURN(1);
1846         }
1847         /* fire off rpcs when we have 'optimal' rpcs as tuned for the wire. */
1848         optimal = cli->cl_max_pages_per_rpc;
1849         if (cmd & OBD_BRW_WRITE) {
1850                 /* trigger a write rpc stream as long as there are dirtiers
1851                  * waiting for space.  as they're waiting, they're not going to
1852                  * create more pages to coallesce with what's waiting.. */
1853                 if (!list_empty(&cli->cl_cache_waiters)) {
1854                         CDEBUG(D_CACHE, "cache waiters forcing RPC\n");
1855                         RETURN(1);
1856                 }
1857                 /* +16 to avoid triggering rpcs that would want to include pages
1858                  * that are being queued but which can't be made ready until
1859                  * the queuer finishes with the page. this is a wart for
1860                  * llite::commit_write() */
1861                 optimal += 16;
1862         }
1863         if (lop->lop_num_pending >= optimal)
1864                 RETURN(1);
1865
1866         RETURN(0);
1867 }
1868
1869 static void on_list(struct list_head *item, struct list_head *list,
1870                     int should_be_on)
1871 {
1872         if (list_empty(item) && should_be_on)
1873                 list_add_tail(item, list);
1874         else if (!list_empty(item) && !should_be_on)
1875                 list_del_init(item);
1876 }
1877
1878 /* maintain the loi's cli list membership invariants so that osc_send_oap_rpc
1879  * can find pages to build into rpcs quickly */
1880 static void loi_list_maint(struct client_obd *cli, struct lov_oinfo *loi)
1881 {
1882         on_list(&loi->loi_cli_item, &cli->cl_loi_ready_list,
1883                 lop_makes_rpc(cli, &loi->loi_write_lop, OBD_BRW_WRITE) ||
1884                 lop_makes_rpc(cli, &loi->loi_read_lop, OBD_BRW_READ));
1885
1886         on_list(&loi->loi_write_item, &cli->cl_loi_write_list,
1887                 loi->loi_write_lop.lop_num_pending);
1888
1889         on_list(&loi->loi_read_item, &cli->cl_loi_read_list,
1890                 loi->loi_read_lop.lop_num_pending);
1891 }
1892
1893 static void lop_update_pending(struct client_obd *cli,
1894                                struct loi_oap_pages *lop, int cmd, int delta)
1895 {
1896         lop->lop_num_pending += delta;
1897         if (cmd & OBD_BRW_WRITE)
1898                 cli->cl_pending_w_pages += delta;
1899         else
1900                 cli->cl_pending_r_pages += delta;
1901 }
1902
1903 /* this is called when a sync waiter receives an interruption.  Its job is to
1904  * get the caller woken as soon as possible.  If its page hasn't been put in an
1905  * rpc yet it can dequeue immediately.  Otherwise it has to mark the rpc as
1906  * desiring interruption which will forcefully complete the rpc once the rpc
1907  * has timed out */
1908 static void osc_occ_interrupted(struct oig_callback_context *occ)
1909 {
1910         struct osc_async_page *oap;
1911         struct loi_oap_pages *lop;
1912         struct lov_oinfo *loi;
1913         ENTRY;
1914
1915         /* XXX member_of() */
1916         oap = list_entry(occ, struct osc_async_page, oap_occ);
1917
1918         client_obd_list_lock(&oap->oap_cli->cl_loi_list_lock);
1919
1920         oap->oap_interrupted = 1;
1921
1922         /* ok, it's been put in an rpc. only one oap gets a request reference */
1923         if (oap->oap_request != NULL) {
1924                 ptlrpc_mark_interrupted(oap->oap_request);
1925                 ptlrpcd_wake(oap->oap_request);
1926                 GOTO(unlock, 0);
1927         }
1928
1929         /* we don't get interruption callbacks until osc_trigger_group_io()
1930          * has been called and put the sync oaps in the pending/urgent lists.*/
1931         if (!list_empty(&oap->oap_pending_item)) {
1932                 list_del_init(&oap->oap_pending_item);
1933                 list_del_init(&oap->oap_urgent_item);
1934
1935                 loi = oap->oap_loi;
1936                 lop = (oap->oap_cmd & OBD_BRW_WRITE) ?
1937                         &loi->loi_write_lop : &loi->loi_read_lop;
1938                 lop_update_pending(oap->oap_cli, lop, oap->oap_cmd, -1);
1939                 loi_list_maint(oap->oap_cli, oap->oap_loi);
1940
1941                 oig_complete_one(oap->oap_oig, &oap->oap_occ, -EINTR);
1942                 oap->oap_oig = NULL;
1943         }
1944
1945 unlock:
1946         client_obd_list_unlock(&oap->oap_cli->cl_loi_list_lock);
1947 }
1948
1949 /* this is trying to propogate async writeback errors back up to the
1950  * application.  As an async write fails we record the error code for later if
1951  * the app does an fsync.  As long as errors persist we force future rpcs to be
1952  * sync so that the app can get a sync error and break the cycle of queueing
1953  * pages for which writeback will fail. */
1954 static void osc_process_ar(struct osc_async_rc *ar, __u64 xid,
1955                            int rc)
1956 {
1957         if (rc) {
1958                 if (!ar->ar_rc)
1959                         ar->ar_rc = rc;
1960
1961                 ar->ar_force_sync = 1;
1962                 ar->ar_min_xid = ptlrpc_sample_next_xid();
1963                 return;
1964
1965         }
1966
1967         if (ar->ar_force_sync && (xid >= ar->ar_min_xid))
1968                 ar->ar_force_sync = 0;
1969 }
1970
1971 static void osc_oap_to_pending(struct osc_async_page *oap)
1972 {
1973         struct loi_oap_pages *lop;
1974
1975         if (oap->oap_cmd & OBD_BRW_WRITE)
1976                 lop = &oap->oap_loi->loi_write_lop;
1977         else
1978                 lop = &oap->oap_loi->loi_read_lop;
1979
1980         if (oap->oap_async_flags & ASYNC_URGENT)
1981                 list_add(&oap->oap_urgent_item, &lop->lop_urgent);
1982         list_add_tail(&oap->oap_pending_item, &lop->lop_pending);
1983         lop_update_pending(oap->oap_cli, lop, oap->oap_cmd, 1);
1984 }
1985
1986 /* this must be called holding the loi list lock to give coverage to exit_cache,
1987  * async_flag maintenance, and oap_request */
1988 static void osc_ap_completion(struct client_obd *cli, struct obdo *oa,
1989                               struct osc_async_page *oap, int sent, int rc)
1990 {
1991         __u64 xid = 0;
1992
1993         ENTRY;
1994         if (oap->oap_request != NULL) {
1995                 xid = ptlrpc_req_xid(oap->oap_request);
1996                 ptlrpc_req_finished(oap->oap_request);
1997                 oap->oap_request = NULL;
1998         }
1999
2000         oap->oap_async_flags = 0;
2001         oap->oap_interrupted = 0;
2002
2003         if (oap->oap_cmd & OBD_BRW_WRITE) {
2004                 osc_process_ar(&cli->cl_ar, xid, rc);
2005                 osc_process_ar(&oap->oap_loi->loi_ar, xid, rc);
2006         }
2007
2008         if (rc == 0 && oa != NULL) {
2009                 if (oa->o_valid & OBD_MD_FLBLOCKS)
2010                         oap->oap_loi->loi_lvb.lvb_blocks = oa->o_blocks;
2011                 if (oa->o_valid & OBD_MD_FLMTIME)
2012                         oap->oap_loi->loi_lvb.lvb_mtime = oa->o_mtime;
2013                 if (oa->o_valid & OBD_MD_FLATIME)
2014                         oap->oap_loi->loi_lvb.lvb_atime = oa->o_atime;
2015                 if (oa->o_valid & OBD_MD_FLCTIME)
2016                         oap->oap_loi->loi_lvb.lvb_ctime = oa->o_ctime;
2017         }
2018
2019         if (oap->oap_oig) {
2020                 osc_exit_cache(cli, oap, sent);
2021                 oig_complete_one(oap->oap_oig, &oap->oap_occ, rc);
2022                 oap->oap_oig = NULL;
2023                 EXIT;
2024                 return;
2025         }
2026
2027         rc = oap->oap_caller_ops->ap_completion(oap->oap_caller_data,
2028                                                 oap->oap_cmd, oa, rc);
2029
2030         /* ll_ap_completion (from llite) drops PG_locked. so, a new
2031          * I/O on the page could start, but OSC calls it under lock
2032          * and thus we can add oap back to pending safely */
2033         if (rc)
2034                 /* upper layer wants to leave the page on pending queue */
2035                 osc_oap_to_pending(oap);
2036         else
2037                 osc_exit_cache(cli, oap, sent);
2038         EXIT;
2039 }
2040
2041 static int brw_interpret(struct ptlrpc_request *req, void *data, int rc)
2042 {
2043         struct osc_brw_async_args *aa = data;
2044         struct client_obd *cli;
2045         ENTRY;
2046
2047         rc = osc_brw_fini_request(req, rc);
2048         CDEBUG(D_INODE, "request %p aa %p rc %d\n", req, aa, rc);
2049         if (osc_recoverable_error(rc)) {
2050                 rc = osc_brw_redo_request(req, aa);
2051                 if (rc == 0)
2052                         RETURN(0);
2053         }
2054
2055         cli = aa->aa_cli;
2056
2057         client_obd_list_lock(&cli->cl_loi_list_lock);
2058
2059         /* We need to decrement before osc_ap_completion->osc_wake_cache_waiters
2060          * is called so we know whether to go to sync BRWs or wait for more
2061          * RPCs to complete */
2062         if (lustre_msg_get_opc(req->rq_reqmsg) == OST_WRITE)
2063                 cli->cl_w_in_flight--;
2064         else
2065                 cli->cl_r_in_flight--;
2066
2067         if (!list_empty(&aa->aa_oaps)) { /* from osc_send_oap_rpc() */
2068                 struct osc_async_page *oap, *tmp;
2069                 /* the caller may re-use the oap after the completion call so
2070                  * we need to clean it up a little */
2071                 list_for_each_entry_safe(oap, tmp, &aa->aa_oaps, oap_rpc_item) {
2072                         list_del_init(&oap->oap_rpc_item);
2073                         osc_ap_completion(cli, aa->aa_oa, oap, 1, rc);
2074                 }
2075                 OBDO_FREE(aa->aa_oa);
2076         } else { /* from async_internal() */
2077                 int i;
2078                 for (i = 0; i < aa->aa_page_count; i++)
2079                         osc_release_write_grant(aa->aa_cli, aa->aa_ppga[i], 1);
2080         }
2081         osc_wake_cache_waiters(cli);
2082         osc_check_rpcs(cli);
2083         client_obd_list_unlock(&cli->cl_loi_list_lock);
2084
2085         osc_release_ppga(aa->aa_ppga, aa->aa_page_count);
2086         RETURN(rc);
2087 }
2088
2089 static struct ptlrpc_request *osc_build_req(struct client_obd *cli,
2090                                             struct list_head *rpc_list,
2091                                             int page_count, int cmd)
2092 {
2093         struct ptlrpc_request *req;
2094         struct brw_page **pga = NULL;
2095         struct osc_brw_async_args *aa;
2096         struct obdo *oa = NULL;
2097         struct obd_async_page_ops *ops = NULL;
2098         void *caller_data = NULL;
2099         struct obd_capa *ocapa;
2100         struct osc_async_page *oap;
2101         int i, rc;
2102
2103         ENTRY;
2104         LASSERT(!list_empty(rpc_list));
2105
2106         OBD_ALLOC(pga, sizeof(*pga) * page_count);
2107         if (pga == NULL)
2108                 RETURN(ERR_PTR(-ENOMEM));
2109
2110         OBDO_ALLOC(oa);
2111         if (oa == NULL)
2112                 GOTO(out, req = ERR_PTR(-ENOMEM));
2113
2114         i = 0;
2115         list_for_each_entry(oap, rpc_list, oap_rpc_item) {
2116                 if (ops == NULL) {
2117                         ops = oap->oap_caller_ops;
2118                         caller_data = oap->oap_caller_data;
2119                 }
2120                 pga[i] = &oap->oap_brw_page;
2121                 pga[i]->off = oap->oap_obj_off + oap->oap_page_off;
2122                 CDEBUG(0, "put page %p index %lu oap %p flg %x to pga\n",
2123                        pga[i]->pg, cfs_page_index(oap->oap_page), oap, pga[i]->flag);
2124                 i++;
2125         }
2126
2127         /* always get the data for the obdo for the rpc */
2128         LASSERT(ops != NULL);
2129         ops->ap_fill_obdo(caller_data, cmd, oa);
2130         ocapa = ops->ap_lookup_capa(caller_data, cmd);
2131
2132         sort_brw_pages(pga, page_count);
2133         rc = osc_brw_prep_request(cmd, cli, oa, NULL, page_count,
2134                                   pga, &req, ocapa);
2135         capa_put(ocapa);
2136         if (rc != 0) {
2137                 CERROR("prep_req failed: %d\n", rc);
2138                 GOTO(out, req = ERR_PTR(rc));
2139         }
2140
2141         /* Need to update the timestamps after the request is built in case
2142          * we race with setattr (locally or in queue at OST).  If OST gets
2143          * later setattr before earlier BRW (as determined by the request xid),
2144          * the OST will not use BRW timestamps.  Sadly, there is no obvious
2145          * way to do this in a single call.  bug 10150 */
2146         ops->ap_update_obdo(caller_data, cmd, oa,
2147                             OBD_MD_FLMTIME | OBD_MD_FLCTIME | OBD_MD_FLATIME);
2148
2149         CLASSERT(sizeof(*aa) <= sizeof(req->rq_async_args));
2150         aa = ptlrpc_req_async_args(req);
2151         CFS_INIT_LIST_HEAD(&aa->aa_oaps);
2152         list_splice(rpc_list, &aa->aa_oaps);
2153         CFS_INIT_LIST_HEAD(rpc_list);
2154
2155 out:
2156         if (IS_ERR(req)) {
2157                 if (oa)
2158                         OBDO_FREE(oa);
2159                 if (pga)
2160                         OBD_FREE(pga, sizeof(*pga) * page_count);
2161         }
2162         RETURN(req);
2163 }
2164
2165 /* the loi lock is held across this function but it's allowed to release
2166  * and reacquire it during its work */
2167 /**
2168  * prepare pages for ASYNC io and put pages in send queue.
2169  *
2170  * \param cli -
2171  * \param loi -
2172  * \param cmd - OBD_BRW_* macroses
2173  * \param lop - pending pages
2174  *
2175  * \return zero if pages successfully add to send queue.
2176  * \return not zere if error occurring.
2177  */
2178 static int osc_send_oap_rpc(struct client_obd *cli, struct lov_oinfo *loi,
2179                             int cmd, struct loi_oap_pages *lop)
2180 {
2181         struct ptlrpc_request *req;
2182         obd_count page_count = 0;
2183         struct osc_async_page *oap = NULL, *tmp;
2184         struct osc_brw_async_args *aa;
2185         struct obd_async_page_ops *ops;
2186         CFS_LIST_HEAD(rpc_list);
2187         unsigned int ending_offset;
2188         unsigned  starting_offset = 0;
2189         int srvlock = 0;
2190         ENTRY;
2191
2192         /* first we find the pages we're allowed to work with */
2193         list_for_each_entry_safe(oap, tmp, &lop->lop_pending,
2194                                  oap_pending_item) {
2195                 ops = oap->oap_caller_ops;
2196
2197                 LASSERT(oap->oap_magic == OAP_MAGIC);
2198
2199                 if (page_count != 0 &&
2200                     srvlock != !!(oap->oap_brw_flags & OBD_BRW_SRVLOCK)) {
2201                         CDEBUG(D_PAGE, "SRVLOCK flag mismatch,"
2202                                " oap %p, page %p, srvlock %u\n",
2203                                oap, oap->oap_brw_page.pg, (unsigned)!srvlock);
2204                         break;
2205                 }
2206                 /* in llite being 'ready' equates to the page being locked
2207                  * until completion unlocks it.  commit_write submits a page
2208                  * as not ready because its unlock will happen unconditionally
2209                  * as the call returns.  if we race with commit_write giving
2210                  * us that page we dont' want to create a hole in the page
2211                  * stream, so we stop and leave the rpc to be fired by
2212                  * another dirtier or kupdated interval (the not ready page
2213                  * will still be on the dirty list).  we could call in
2214                  * at the end of ll_file_write to process the queue again. */
2215                 if (!(oap->oap_async_flags & ASYNC_READY)) {
2216                         int rc = ops->ap_make_ready(oap->oap_caller_data, cmd);
2217                         if (rc < 0)
2218                                 CDEBUG(D_INODE, "oap %p page %p returned %d "
2219                                                 "instead of ready\n", oap,
2220                                                 oap->oap_page, rc);
2221                         switch (rc) {
2222                         case -EAGAIN:
2223                                 /* llite is telling us that the page is still
2224                                  * in commit_write and that we should try
2225                                  * and put it in an rpc again later.  we
2226                                  * break out of the loop so we don't create
2227                                  * a hole in the sequence of pages in the rpc
2228                                  * stream.*/
2229                                 oap = NULL;
2230                                 break;
2231                         case -EINTR:
2232                                 /* the io isn't needed.. tell the checks
2233                                  * below to complete the rpc with EINTR */
2234                                 oap->oap_async_flags |= ASYNC_COUNT_STABLE;
2235                                 oap->oap_count = -EINTR;
2236                                 break;
2237                         case 0:
2238                                 oap->oap_async_flags |= ASYNC_READY;
2239                                 break;
2240                         default:
2241                                 LASSERTF(0, "oap %p page %p returned %d "
2242                                             "from make_ready\n", oap,
2243                                             oap->oap_page, rc);
2244                                 break;
2245                         }
2246                 }
2247                 if (oap == NULL)
2248                         break;
2249                 /*
2250                  * Page submitted for IO has to be locked. Either by
2251                  * ->ap_make_ready() or by higher layers.
2252                  */
2253 #if defined(__KERNEL__) && defined(__linux__)
2254                  if(!(PageLocked(oap->oap_page) &&
2255                      (CheckWriteback(oap->oap_page, cmd) || oap->oap_oig !=NULL))) {
2256                         CDEBUG(D_PAGE, "page %p lost wb %lx/%x\n",
2257                                oap->oap_page, (long)oap->oap_page->flags, oap->oap_async_flags);
2258                         LBUG();
2259                 }
2260 #endif
2261                 /* If there is a gap at the start of this page, it can't merge
2262                  * with any previous page, so we'll hand the network a
2263                  * "fragmented" page array that it can't transfer in 1 RDMA */
2264                 if (page_count != 0 && oap->oap_page_off != 0)
2265                         break;
2266
2267                 /* take the page out of our book-keeping */
2268                 list_del_init(&oap->oap_pending_item);
2269                 lop_update_pending(cli, lop, cmd, -1);
2270                 list_del_init(&oap->oap_urgent_item);
2271
2272                 if (page_count == 0)
2273                         starting_offset = (oap->oap_obj_off+oap->oap_page_off) &
2274                                           (PTLRPC_MAX_BRW_SIZE - 1);
2275
2276                 /* ask the caller for the size of the io as the rpc leaves. */
2277                 if (!(oap->oap_async_flags & ASYNC_COUNT_STABLE))
2278                         oap->oap_count =
2279                                 ops->ap_refresh_count(oap->oap_caller_data,cmd);
2280                 if (oap->oap_count <= 0) {
2281                         CDEBUG(D_CACHE, "oap %p count %d, completing\n", oap,
2282                                oap->oap_count);
2283                         osc_ap_completion(cli, NULL, oap, 0, oap->oap_count);
2284                         continue;
2285                 }
2286
2287                 /* now put the page back in our accounting */
2288                 list_add_tail(&oap->oap_rpc_item, &rpc_list);
2289                 if (page_count == 0)
2290                         srvlock = !!(oap->oap_brw_flags & OBD_BRW_SRVLOCK);
2291                 if (++page_count >= cli->cl_max_pages_per_rpc)
2292                         break;
2293
2294                 /* End on a PTLRPC_MAX_BRW_SIZE boundary.  We want full-sized
2295                  * RPCs aligned on PTLRPC_MAX_BRW_SIZE boundaries to help reads
2296                  * have the same alignment as the initial writes that allocated
2297                  * extents on the server. */
2298                 ending_offset = (oap->oap_obj_off + oap->oap_page_off +
2299                                  oap->oap_count) & (PTLRPC_MAX_BRW_SIZE - 1);
2300                 if (ending_offset == 0)
2301                         break;
2302
2303                 /* If there is a gap at the end of this page, it can't merge
2304                  * with any subsequent pages, so we'll hand the network a
2305                  * "fragmented" page array that it can't transfer in 1 RDMA */
2306                 if (oap->oap_page_off + oap->oap_count < CFS_PAGE_SIZE)
2307                         break;
2308         }
2309
2310         osc_wake_cache_waiters(cli);
2311
2312         if (page_count == 0)
2313                 RETURN(0);
2314
2315         loi_list_maint(cli, loi);
2316
2317         client_obd_list_unlock(&cli->cl_loi_list_lock);
2318
2319         req = osc_build_req(cli, &rpc_list, page_count, cmd);
2320         if (IS_ERR(req)) {
2321                 /* this should happen rarely and is pretty bad, it makes the
2322                  * pending list not follow the dirty order */
2323                 client_obd_list_lock(&cli->cl_loi_list_lock);
2324                 list_for_each_entry_safe(oap, tmp, &rpc_list, oap_rpc_item) {
2325                         list_del_init(&oap->oap_rpc_item);
2326
2327                         /* queued sync pages can be torn down while the pages
2328                          * were between the pending list and the rpc */
2329                         if (oap->oap_interrupted) {
2330                                 CDEBUG(D_INODE, "oap %p interrupted\n", oap);
2331                                 osc_ap_completion(cli, NULL, oap, 0,
2332                                                   oap->oap_count);
2333                                 continue;
2334                         }
2335                         osc_ap_completion(cli, NULL, oap, 0, PTR_ERR(req));
2336                 }
2337                 loi_list_maint(cli, loi);
2338                 RETURN(PTR_ERR(req));
2339         }
2340
2341         aa = ptlrpc_req_async_args(req);
2342
2343         if (cmd == OBD_BRW_READ) {
2344                 lprocfs_oh_tally_log2(&cli->cl_read_page_hist, page_count);
2345                 lprocfs_oh_tally(&cli->cl_read_rpc_hist, cli->cl_r_in_flight);
2346                 lprocfs_oh_tally_log2(&cli->cl_read_offset_hist,
2347                                       (starting_offset >> CFS_PAGE_SHIFT) + 1);
2348         } else {
2349                 lprocfs_oh_tally_log2(&cli->cl_write_page_hist, page_count);
2350                 lprocfs_oh_tally(&cli->cl_write_rpc_hist,
2351                                  cli->cl_w_in_flight);
2352                 lprocfs_oh_tally_log2(&cli->cl_write_offset_hist,
2353                                       (starting_offset >> CFS_PAGE_SHIFT) + 1);
2354         }
2355         ptlrpc_lprocfs_brw(req, aa->aa_requested_nob);
2356
2357         client_obd_list_lock(&cli->cl_loi_list_lock);
2358
2359         if (cmd == OBD_BRW_READ)
2360                 cli->cl_r_in_flight++;
2361         else
2362                 cli->cl_w_in_flight++;
2363
2364         /* queued sync pages can be torn down while the pages
2365          * were between the pending list and the rpc */
2366         tmp = NULL;
2367         list_for_each_entry(oap, &aa->aa_oaps, oap_rpc_item) {
2368                 /* only one oap gets a request reference */
2369                 if (tmp == NULL)
2370                         tmp = oap;
2371                 if (oap->oap_interrupted && !req->rq_intr) {
2372                         CDEBUG(D_INODE, "oap %p in req %p interrupted\n",
2373                                oap, req);
2374                         ptlrpc_mark_interrupted(req);
2375                 }
2376         }
2377         if (tmp != NULL)
2378                 tmp->oap_request = ptlrpc_request_addref(req);
2379
2380         DEBUG_REQ(D_INODE, req, "%d pages, aa %p. now %dr/%dw in flight",
2381                   page_count, aa, cli->cl_r_in_flight, cli->cl_w_in_flight);
2382
2383         req->rq_interpret_reply = brw_interpret;
2384         ptlrpcd_add_req(req);
2385         RETURN(1);
2386 }
2387
2388 #define LOI_DEBUG(LOI, STR, args...)                                     \
2389         CDEBUG(D_INODE, "loi ready %d wr %d:%d rd %d:%d " STR,           \
2390                !list_empty(&(LOI)->loi_cli_item),                        \
2391                (LOI)->loi_write_lop.lop_num_pending,                     \
2392                !list_empty(&(LOI)->loi_write_lop.lop_urgent),            \
2393                (LOI)->loi_read_lop.lop_num_pending,                      \
2394                !list_empty(&(LOI)->loi_read_lop.lop_urgent),             \
2395                args)                                                     \
2396
2397 /* This is called by osc_check_rpcs() to find which objects have pages that
2398  * we could be sending.  These lists are maintained by lop_makes_rpc(). */
2399 struct lov_oinfo *osc_next_loi(struct client_obd *cli)
2400 {
2401         ENTRY;
2402         /* first return all objects which we already know to have
2403          * pages ready to be stuffed into rpcs */
2404         if (!list_empty(&cli->cl_loi_ready_list))
2405                 RETURN(list_entry(cli->cl_loi_ready_list.next,
2406                                   struct lov_oinfo, loi_cli_item));
2407
2408         /* then if we have cache waiters, return all objects with queued
2409          * writes.  This is especially important when many small files
2410          * have filled up the cache and not been fired into rpcs because
2411          * they don't pass the nr_pending/object threshhold */
2412         if (!list_empty(&cli->cl_cache_waiters) &&
2413             !list_empty(&cli->cl_loi_write_list))
2414                 RETURN(list_entry(cli->cl_loi_write_list.next,
2415                                   struct lov_oinfo, loi_write_item));
2416
2417         /* then return all queued objects when we have an invalid import
2418          * so that they get flushed */
2419         if (cli->cl_import == NULL || cli->cl_import->imp_invalid) {
2420                 if (!list_empty(&cli->cl_loi_write_list))
2421                         RETURN(list_entry(cli->cl_loi_write_list.next,
2422                                           struct lov_oinfo, loi_write_item));
2423                 if (!list_empty(&cli->cl_loi_read_list))
2424                         RETURN(list_entry(cli->cl_loi_read_list.next,
2425                                           struct lov_oinfo, loi_read_item));
2426         }
2427         RETURN(NULL);
2428 }
2429
2430 /* called with the loi list lock held */
2431 static void osc_check_rpcs(struct client_obd *cli)
2432 {
2433         struct lov_oinfo *loi;
2434         int rc = 0, race_counter = 0;
2435         ENTRY;
2436
2437         while ((loi = osc_next_loi(cli)) != NULL) {
2438                 LOI_DEBUG(loi, "%lu in flight\n", rpcs_in_flight(cli));
2439
2440                 if (rpcs_in_flight(cli) >= cli->cl_max_rpcs_in_flight)
2441                         break;
2442
2443                 /* attempt some read/write balancing by alternating between
2444                  * reads and writes in an object.  The makes_rpc checks here
2445                  * would be redundant if we were getting read/write work items
2446                  * instead of objects.  we don't want send_oap_rpc to drain a
2447                  * partial read pending queue when we're given this object to
2448                  * do io on writes while there are cache waiters */
2449                 if (lop_makes_rpc(cli, &loi->loi_write_lop, OBD_BRW_WRITE)) {
2450                         rc = osc_send_oap_rpc(cli, loi, OBD_BRW_WRITE,
2451                                               &loi->loi_write_lop);
2452                         if (rc < 0)
2453                                 break;
2454                         if (rc > 0)
2455                                 race_counter = 0;
2456                         else
2457                                 race_counter++;
2458                 }
2459                 if (lop_makes_rpc(cli, &loi->loi_read_lop, OBD_BRW_READ)) {
2460                         rc = osc_send_oap_rpc(cli, loi, OBD_BRW_READ,
2461                                               &loi->loi_read_lop);
2462                         if (rc < 0)
2463                                 break;
2464                         if (rc > 0)
2465                                 race_counter = 0;
2466                         else
2467                                 race_counter++;
2468                 }
2469
2470                 /* attempt some inter-object balancing by issueing rpcs
2471                  * for each object in turn */
2472                 if (!list_empty(&loi->loi_cli_item))
2473                         list_del_init(&loi->loi_cli_item);
2474                 if (!list_empty(&loi->loi_write_item))
2475                         list_del_init(&loi->loi_write_item);
2476                 if (!list_empty(&loi->loi_read_item))
2477                         list_del_init(&loi->loi_read_item);
2478
2479                 loi_list_maint(cli, loi);
2480
2481                 /* send_oap_rpc fails with 0 when make_ready tells it to
2482                  * back off.  llite's make_ready does this when it tries
2483                  * to lock a page queued for write that is already locked.
2484                  * we want to try sending rpcs from many objects, but we
2485                  * don't want to spin failing with 0.  */
2486                 if (race_counter == 10)
2487                         break;
2488         }
2489         EXIT;
2490 }
2491
2492 /* we're trying to queue a page in the osc so we're subject to the
2493  * 'cl_dirty_max' limit on the number of pages that can be queued in the osc.
2494  * If the osc's queued pages are already at that limit, then we want to sleep
2495  * until there is space in the osc's queue for us.  We also may be waiting for
2496  * write credits from the OST if there are RPCs in flight that may return some
2497  * before we fall back to sync writes.
2498  *
2499  * We need this know our allocation was granted in the presence of signals */
2500 static int ocw_granted(struct client_obd *cli, struct osc_cache_waiter *ocw)
2501 {
2502         int rc;
2503         ENTRY;
2504         client_obd_list_lock(&cli->cl_loi_list_lock);
2505         rc = list_empty(&ocw->ocw_entry) || rpcs_in_flight(cli) == 0;
2506         client_obd_list_unlock(&cli->cl_loi_list_lock);
2507         RETURN(rc);
2508 };
2509
2510 /* Caller must hold loi_list_lock - we drop/regain it if we need to wait for
2511  * grant or cache space. */
2512 static int osc_enter_cache(struct client_obd *cli, struct lov_oinfo *loi,
2513                            struct osc_async_page *oap)
2514 {
2515         struct osc_cache_waiter ocw;
2516         struct l_wait_info lwi = { 0 };
2517
2518         ENTRY;
2519
2520         CDEBUG(D_CACHE, "dirty: %ld/%d dirty_max: %ld/%d dropped: %lu "
2521                "grant: %lu\n", cli->cl_dirty, atomic_read(&obd_dirty_pages),
2522                cli->cl_dirty_max, obd_max_dirty_pages,
2523                cli->cl_lost_grant, cli->cl_avail_grant);
2524
2525         /* force the caller to try sync io.  this can jump the list
2526          * of queued writes and create a discontiguous rpc stream */
2527         if (cli->cl_dirty_max < CFS_PAGE_SIZE || cli->cl_ar.ar_force_sync ||
2528             loi->loi_ar.ar_force_sync)
2529                 RETURN(-EDQUOT);
2530
2531         /* Hopefully normal case - cache space and write credits available */
2532         if ((cli->cl_dirty + CFS_PAGE_SIZE <= cli->cl_dirty_max) &&
2533             (atomic_read(&obd_dirty_pages) + 1 <= obd_max_dirty_pages) &&
2534             (cli->cl_avail_grant >= CFS_PAGE_SIZE)) {
2535                 /* account for ourselves */
2536                 osc_consume_write_grant(cli, &oap->oap_brw_page);
2537                 RETURN(0);
2538         }
2539
2540         /* Make sure that there are write rpcs in flight to wait for.  This
2541          * is a little silly as this object may not have any pending but
2542          * other objects sure might. */
2543         if (cli->cl_w_in_flight) {
2544                 list_add_tail(&ocw.ocw_entry, &cli->cl_cache_waiters);
2545                 cfs_waitq_init(&ocw.ocw_waitq);
2546                 ocw.ocw_oap = oap;
2547                 ocw.ocw_rc = 0;
2548
2549                 loi_list_maint(cli, loi);
2550                 osc_check_rpcs(cli);
2551                 client_obd_list_unlock(&cli->cl_loi_list_lock);
2552
2553                 CDEBUG(D_CACHE, "sleeping for cache space\n");
2554                 l_wait_event(ocw.ocw_waitq, ocw_granted(cli, &ocw), &lwi);
2555
2556                 client_obd_list_lock(&cli->cl_loi_list_lock);
2557                 if (!list_empty(&ocw.ocw_entry)) {
2558                         list_del(&ocw.ocw_entry);
2559                         RETURN(-EINTR);
2560                 }
2561                 RETURN(ocw.ocw_rc);
2562         }
2563
2564         RETURN(-EDQUOT);
2565 }
2566
2567 /**
2568  * Checks if requested extent lock is compatible with a lock under the page.
2569  *
2570  * Checks if the lock under \a page is compatible with a read or write lock
2571  * (specified by \a rw) for an extent [\a start , \a end].
2572  *
2573  * \param exp osc export
2574  * \param lsm striping information for the file
2575  * \param res osc_async_page placeholder
2576  * \param rw OBD_BRW_READ if requested for reading,
2577  *           OBD_BRW_WRITE if requested for writing
2578  * \param start start of the requested extent
2579  * \param end end of the requested extent
2580  * \param cookie transparent parameter for passing locking context
2581  *
2582  * \post result == 1, *cookie == context, appropriate lock is referenced or
2583  * \post result == 0
2584  *
2585  * \retval 1 owned lock is reused for the request
2586  * \retval 0 no lock reused for the request
2587  *
2588  * \see osc_release_short_lock
2589  */
2590 static int osc_reget_short_lock(struct obd_export *exp,
2591                                 struct lov_stripe_md *lsm,
2592                                 void **res, int rw,
2593                                 obd_off start, obd_off end,
2594                                 void **cookie)
2595 {
2596         struct osc_async_page *oap = *res;
2597         int rc;
2598
2599         ENTRY;
2600
2601         spin_lock(&oap->oap_lock);
2602         rc = ldlm_lock_fast_match(oap->oap_ldlm_lock, rw,
2603                                   start, end, cookie);
2604         spin_unlock(&oap->oap_lock);
2605
2606         RETURN(rc);
2607 }
2608
2609 /**
2610  * Releases a reference to a lock taken in a "fast" way.
2611  *
2612  * Releases a read or a write (specified by \a rw) lock
2613  * referenced by \a cookie.
2614  *
2615  * \param exp osc export
2616  * \param lsm striping information for the file
2617  * \param end end of the locked extent
2618  * \param rw OBD_BRW_READ if requested for reading,
2619  *           OBD_BRW_WRITE if requested for writing
2620  * \param cookie transparent parameter for passing locking context
2621  *
2622  * \post appropriate lock is dereferenced
2623  *
2624  * \see osc_reget_short_lock
2625  */
2626 static int osc_release_short_lock(struct obd_export *exp,
2627                                   struct lov_stripe_md *lsm, obd_off end,
2628                                   void *cookie, int rw)
2629 {
2630         ENTRY;
2631         ldlm_lock_fast_release(cookie, rw);
2632         /* no error could have happened at this layer */
2633         RETURN(0);
2634 }
2635
2636 int osc_prep_async_page(struct obd_export *exp, struct lov_stripe_md *lsm,
2637                         struct lov_oinfo *loi, cfs_page_t *page,
2638                         obd_off offset, struct obd_async_page_ops *ops,
2639                         void *data, void **res, int nocache,
2640                         struct lustre_handle *lockh)
2641 {
2642         struct osc_async_page *oap;
2643         struct ldlm_res_id oid;
2644         int rc = 0;
2645         ENTRY;
2646
2647         if (!page)
2648                 return size_round(sizeof(*oap));
2649
2650         oap = *res;
2651         oap->oap_magic = OAP_MAGIC;
2652         oap->oap_cli = &exp->exp_obd->u.cli;
2653         oap->oap_loi = loi;
2654
2655         oap->oap_caller_ops = ops;
2656         oap->oap_caller_data = data;
2657
2658         oap->oap_page = page;
2659         oap->oap_obj_off = offset;
2660
2661         CFS_INIT_LIST_HEAD(&oap->oap_pending_item);
2662         CFS_INIT_LIST_HEAD(&oap->oap_urgent_item);
2663         CFS_INIT_LIST_HEAD(&oap->oap_rpc_item);
2664         CFS_INIT_LIST_HEAD(&oap->oap_page_list);
2665
2666         oap->oap_occ.occ_interrupted = osc_occ_interrupted;
2667
2668         spin_lock_init(&oap->oap_lock);
2669
2670         /* If the page was marked as notcacheable - don't add to any locks */
2671         if (!nocache) {
2672                 osc_build_res_name(loi->loi_id, loi->loi_gr, &oid);
2673                 /* This is the only place where we can call cache_add_extent
2674                    without oap_lock, because this page is locked now, and
2675                    the lock we are adding it to is referenced, so cannot lose
2676                    any pages either. */
2677                 rc = cache_add_extent(oap->oap_cli->cl_cache, &oid, oap, lockh);
2678                 if (rc)
2679                         RETURN(rc);
2680         }
2681
2682         CDEBUG(D_CACHE, "oap %p page %p obj off "LPU64"\n", oap, page, offset);
2683         RETURN(0);
2684 }
2685
2686 struct osc_async_page *oap_from_cookie(void *cookie)
2687 {
2688         struct osc_async_page *oap = cookie;
2689         if (oap->oap_magic != OAP_MAGIC)
2690                 return ERR_PTR(-EINVAL);
2691         return oap;
2692 };
2693
2694 static int osc_queue_async_io(struct obd_export *exp, struct lov_stripe_md *lsm,
2695                               struct lov_oinfo *loi, void *cookie,
2696                               int cmd, obd_off off, int count,
2697                               obd_flag brw_flags, enum async_flags async_flags)
2698 {
2699         struct client_obd *cli = &exp->exp_obd->u.cli;
2700         struct osc_async_page *oap;
2701         int rc = 0;
2702         ENTRY;
2703
2704         oap = oap_from_cookie(cookie);
2705         if (IS_ERR(oap))
2706                 RETURN(PTR_ERR(oap));
2707
2708         if (cli->cl_import == NULL || cli->cl_import->imp_invalid)
2709                 RETURN(-EIO);
2710
2711         if (!list_empty(&oap->oap_pending_item) ||
2712             !list_empty(&oap->oap_urgent_item) ||
2713             !list_empty(&oap->oap_rpc_item))
2714                 RETURN(-EBUSY);
2715
2716         /* check if the file's owner/group is over quota */
2717 #ifdef HAVE_QUOTA_SUPPORT
2718         if ((cmd & OBD_BRW_WRITE) && !(cmd & OBD_BRW_NOQUOTA)){
2719                 struct obd_async_page_ops *ops;
2720                 struct obdo *oa;
2721
2722                 OBDO_ALLOC(oa);
2723                 if (oa == NULL)
2724                         RETURN(-ENOMEM);
2725
2726                 ops = oap->oap_caller_ops;
2727                 ops->ap_fill_obdo(oap->oap_caller_data, cmd, oa);
2728                 if (lquota_chkdq(quota_interface, cli, oa->o_uid, oa->o_gid) ==
2729                     NO_QUOTA)
2730                         rc = -EDQUOT;
2731
2732                 OBDO_FREE(oa);
2733                 if (rc)
2734                         RETURN(rc);
2735         }
2736 #endif
2737
2738         if (loi == NULL)
2739                 loi = lsm->lsm_oinfo[0];
2740
2741         client_obd_list_lock(&cli->cl_loi_list_lock);
2742
2743         oap->oap_cmd = cmd;
2744         oap->oap_page_off = off;
2745         oap->oap_count = count;
2746         oap->oap_brw_flags = brw_flags;
2747         oap->oap_async_flags = async_flags;
2748
2749         if (cmd & OBD_BRW_WRITE) {
2750                 rc = osc_enter_cache(cli, loi, oap);
2751                 if (rc) {
2752                         client_obd_list_unlock(&cli->cl_loi_list_lock);
2753                         RETURN(rc);
2754                 }
2755         }
2756
2757         osc_oap_to_pending(oap);
2758         loi_list_maint(cli, loi);
2759
2760         LOI_DEBUG(loi, "oap %p page %p added for cmd %d\n", oap, oap->oap_page,
2761                   cmd);
2762
2763         osc_check_rpcs(cli);
2764         client_obd_list_unlock(&cli->cl_loi_list_lock);
2765
2766         RETURN(0);
2767 }
2768
2769 /* aka (~was & now & flag), but this is more clear :) */
2770 #define SETTING(was, now, flag) (!(was & flag) && (now & flag))
2771
2772 static int osc_set_async_flags(struct obd_export *exp,
2773                                struct lov_stripe_md *lsm,
2774                                struct lov_oinfo *loi, void *cookie,
2775                                obd_flag async_flags)
2776 {
2777         struct client_obd *cli = &exp->exp_obd->u.cli;
2778         struct loi_oap_pages *lop;
2779         struct osc_async_page *oap;
2780         int rc = 0;
2781         ENTRY;
2782
2783         oap = oap_from_cookie(cookie);
2784         if (IS_ERR(oap))
2785                 RETURN(PTR_ERR(oap));
2786
2787         /*
2788          * bug 7311: OST-side locking is only supported for liblustre for now
2789          * (and liblustre never calls obd_set_async_flags(). I hope.), generic
2790          * implementation has to handle case where OST-locked page was picked
2791          * up by, e.g., ->writepage().
2792          */
2793         LASSERT(!(oap->oap_brw_flags & OBD_BRW_SRVLOCK));
2794         LASSERT(!LIBLUSTRE_CLIENT); /* check that liblustre angels do fear to
2795                                      * tread here. */
2796
2797         if (cli->cl_import == NULL || cli->cl_import->imp_invalid)
2798                 RETURN(-EIO);
2799
2800         if (loi == NULL)
2801                 loi = lsm->lsm_oinfo[0];
2802
2803         if (oap->oap_cmd & OBD_BRW_WRITE) {
2804                 lop = &loi->loi_write_lop;
2805         } else {
2806                 lop = &loi->loi_read_lop;
2807         }
2808
2809         client_obd_list_lock(&cli->cl_loi_list_lock);
2810
2811         if (list_empty(&oap->oap_pending_item))
2812                 GOTO(out, rc = -EINVAL);
2813
2814         if ((oap->oap_async_flags & async_flags) == async_flags)
2815                 GOTO(out, rc = 0);
2816
2817         if (SETTING(oap->oap_async_flags, async_flags, ASYNC_READY))
2818                 oap->oap_async_flags |= ASYNC_READY;
2819
2820         if (SETTING(oap->oap_async_flags, async_flags, ASYNC_URGENT)) {
2821                 if (list_empty(&oap->oap_rpc_item)) {
2822                         list_add(&oap->oap_urgent_item, &lop->lop_urgent);
2823                         loi_list_maint(cli, loi);
2824                 }
2825         }
2826
2827         LOI_DEBUG(loi, "oap %p page %p has flags %x\n", oap, oap->oap_page,
2828                         oap->oap_async_flags);
2829 out:
2830         osc_check_rpcs(cli);
2831         client_obd_list_unlock(&cli->cl_loi_list_lock);
2832         RETURN(rc);
2833 }
2834
2835 static int osc_queue_group_io(struct obd_export *exp, struct lov_stripe_md *lsm,
2836                              struct lov_oinfo *loi,
2837                              struct obd_io_group *oig, void *cookie,
2838                              int cmd, obd_off off, int count,
2839                              obd_flag brw_flags,
2840                              obd_flag async_flags)
2841 {
2842         struct client_obd *cli = &exp->exp_obd->u.cli;
2843         struct osc_async_page *oap;
2844         struct loi_oap_pages *lop;
2845         int rc = 0;
2846         ENTRY;
2847
2848         oap = oap_from_cookie(cookie);
2849         if (IS_ERR(oap))
2850                 RETURN(PTR_ERR(oap));
2851
2852         if (cli->cl_import == NULL || cli->cl_import->imp_invalid)
2853                 RETURN(-EIO);
2854
2855         if (!list_empty(&oap->oap_pending_item) ||
2856             !list_empty(&oap->oap_urgent_item) ||
2857             !list_empty(&oap->oap_rpc_item))
2858                 RETURN(-EBUSY);
2859
2860         if (loi == NULL)
2861                 loi = lsm->lsm_oinfo[0];
2862
2863         client_obd_list_lock(&cli->cl_loi_list_lock);
2864
2865         oap->oap_cmd = cmd;
2866         oap->oap_page_off = off;
2867         oap->oap_count = count;
2868         oap->oap_brw_flags = brw_flags;
2869         oap->oap_async_flags = async_flags;
2870
2871         if (cmd & OBD_BRW_WRITE)
2872                 lop = &loi->loi_write_lop;
2873         else
2874                 lop = &loi->loi_read_lop;
2875
2876         list_add_tail(&oap->oap_pending_item, &lop->lop_pending_group);
2877         if (oap->oap_async_flags & ASYNC_GROUP_SYNC) {
2878                 oap->oap_oig = oig;
2879                 rc = oig_add_one(oig, &oap->oap_occ);
2880         }
2881
2882         LOI_DEBUG(loi, "oap %p page %p on group pending: rc %d\n",
2883                   oap, oap->oap_page, rc);
2884
2885         client_obd_list_unlock(&cli->cl_loi_list_lock);
2886
2887         RETURN(rc);
2888 }
2889
2890 static void osc_group_to_pending(struct client_obd *cli, struct lov_oinfo *loi,
2891                                  struct loi_oap_pages *lop, int cmd)
2892 {
2893         struct list_head *pos, *tmp;
2894         struct osc_async_page *oap;
2895
2896         list_for_each_safe(pos, tmp, &lop->lop_pending_group) {
2897                 oap = list_entry(pos, struct osc_async_page, oap_pending_item);
2898                 list_del(&oap->oap_pending_item);
2899                 osc_oap_to_pending(oap);
2900         }
2901         loi_list_maint(cli, loi);
2902 }
2903
2904 static int osc_trigger_group_io(struct obd_export *exp,
2905                                 struct lov_stripe_md *lsm,
2906                                 struct lov_oinfo *loi,
2907                                 struct obd_io_group *oig)
2908 {
2909         struct client_obd *cli = &exp->exp_obd->u.cli;
2910         ENTRY;
2911
2912         if (loi == NULL)
2913                 loi = lsm->lsm_oinfo[0];
2914
2915         client_obd_list_lock(&cli->cl_loi_list_lock);
2916
2917         osc_group_to_pending(cli, loi, &loi->loi_write_lop, OBD_BRW_WRITE);
2918         osc_group_to_pending(cli, loi, &loi->loi_read_lop, OBD_BRW_READ);
2919
2920         osc_check_rpcs(cli);
2921         client_obd_list_unlock(&cli->cl_loi_list_lock);
2922
2923         RETURN(0);
2924 }
2925
2926 static int osc_teardown_async_page(struct obd_export *exp,
2927                                    struct lov_stripe_md *lsm,
2928                                    struct lov_oinfo *loi, void *cookie)
2929 {
2930         struct client_obd *cli = &exp->exp_obd->u.cli;
2931         struct loi_oap_pages *lop;
2932         struct osc_async_page *oap;
2933         int rc = 0;
2934         ENTRY;
2935
2936         oap = oap_from_cookie(cookie);
2937         if (IS_ERR(oap))
2938                 RETURN(PTR_ERR(oap));
2939
2940         if (loi == NULL)
2941                 loi = lsm->lsm_oinfo[0];
2942
2943         if (oap->oap_cmd & OBD_BRW_WRITE) {
2944                 lop = &loi->loi_write_lop;
2945         } else {
2946                 lop = &loi->loi_read_lop;
2947         }
2948
2949         client_obd_list_lock(&cli->cl_loi_list_lock);
2950
2951         if (!list_empty(&oap->oap_rpc_item))
2952                 GOTO(out, rc = -EBUSY);
2953
2954         osc_exit_cache(cli, oap, 0);
2955         osc_wake_cache_waiters(cli);
2956
2957         if (!list_empty(&oap->oap_urgent_item)) {
2958                 list_del_init(&oap->oap_urgent_item);
2959                 oap->oap_async_flags &= ~ASYNC_URGENT;
2960         }
2961         if (!list_empty(&oap->oap_pending_item)) {
2962                 list_del_init(&oap->oap_pending_item);
2963                 lop_update_pending(cli, lop, oap->oap_cmd, -1);
2964         }
2965         loi_list_maint(cli, loi);
2966         cache_remove_extent(cli->cl_cache, oap);
2967
2968         LOI_DEBUG(loi, "oap %p page %p torn down\n", oap, oap->oap_page);
2969 out:
2970         client_obd_list_unlock(&cli->cl_loi_list_lock);
2971         RETURN(rc);
2972 }
2973
2974 int osc_extent_blocking_cb(struct ldlm_lock *lock,
2975                            struct ldlm_lock_desc *new, void *data,
2976                            int flag)
2977 {
2978         struct lustre_handle lockh = { 0 };
2979         int rc;
2980         ENTRY;
2981
2982         if ((unsigned long)data > 0 && (unsigned long)data < 0x1000) {
2983                 LDLM_ERROR(lock, "cancelling lock with bad data %p", data);
2984                 LBUG();
2985         }
2986
2987         switch (flag) {
2988         case LDLM_CB_BLOCKING:
2989                 ldlm_lock2handle(lock, &lockh);
2990                 rc = ldlm_cli_cancel(&lockh);
2991                 if (rc != ELDLM_OK)
2992                         CERROR("ldlm_cli_cancel failed: %d\n", rc);
2993                 break;
2994         case LDLM_CB_CANCELING: {
2995
2996                 ldlm_lock2handle(lock, &lockh);
2997                 /* This lock wasn't granted, don't try to do anything */
2998                 if (lock->l_req_mode != lock->l_granted_mode)
2999                         RETURN(0);
3000
3001                 cache_remove_lock(lock->l_conn_export->exp_obd->u.cli.cl_cache,
3002                                   &lockh);
3003
3004                 if (lock->l_conn_export->exp_obd->u.cli.cl_ext_lock_cancel_cb)
3005                         lock->l_conn_export->exp_obd->u.cli.cl_ext_lock_cancel_cb(
3006                                                           lock, new, data,flag);
3007                 break;
3008         }
3009         default:
3010                 LBUG();
3011         }
3012
3013         RETURN(0);
3014 }
3015 EXPORT_SYMBOL(osc_extent_blocking_cb);
3016
3017 static void osc_set_data_with_check(struct lustre_handle *lockh, void *data,
3018                                     int flags)
3019 {
3020         struct ldlm_lock *lock = ldlm_handle2lock(lockh);
3021
3022         if (lock == NULL) {
3023                 CERROR("lockh %p, data %p - client evicted?\n", lockh, data);
3024                 return;
3025         }
3026         lock_res_and_lock(lock);
3027 #if defined (__KERNEL__) && defined (__linux__)
3028         /* Liang XXX: Darwin and Winnt checking should be added */
3029         if (lock->l_ast_data && lock->l_ast_data != data) {
3030                 struct inode *new_inode = data;
3031                 struct inode *old_inode = lock->l_ast_data;
3032                 if (!(old_inode->i_state & I_FREEING))
3033                         LDLM_ERROR(lock, "inconsistent l_ast_data found");
3034                 LASSERTF(old_inode->i_state & I_FREEING,
3035                          "Found existing inode %p/%lu/%u state %lu in lock: "
3036                          "setting data to %p/%lu/%u\n", old_inode,
3037                          old_inode->i_ino, old_inode->i_generation,
3038                          old_inode->i_state,
3039                          new_inode, new_inode->i_ino, new_inode->i_generation);
3040         }
3041 #endif
3042         lock->l_ast_data = data;
3043         lock->l_flags |= (flags & LDLM_FL_NO_LRU);
3044         unlock_res_and_lock(lock);
3045         LDLM_LOCK_PUT(lock);
3046 }
3047
3048 static int osc_change_cbdata(struct obd_export *exp, struct lov_stripe_md *lsm,
3049                              ldlm_iterator_t replace, void *data)
3050 {
3051         struct ldlm_res_id res_id;
3052         struct obd_device *obd = class_exp2obd(exp);
3053
3054         osc_build_res_name(lsm->lsm_object_id, lsm->lsm_object_gr, &res_id);
3055         ldlm_resource_iterate(obd->obd_namespace, &res_id, replace, data);
3056         return 0;
3057 }
3058
3059 static int osc_enqueue_fini(struct obd_device *obd, struct ptlrpc_request *req,
3060                             struct obd_info *oinfo, int intent, int rc)
3061 {
3062         ENTRY;
3063
3064         if (intent) {
3065                 /* The request was created before ldlm_cli_enqueue call. */
3066                 if (rc == ELDLM_LOCK_ABORTED) {
3067                         struct ldlm_reply *rep;
3068                         rep = req_capsule_server_get(&req->rq_pill,
3069                                                      &RMF_DLM_REP);
3070
3071                         LASSERT(rep != NULL);
3072                         if (rep->lock_policy_res1)
3073                                 rc = rep->lock_policy_res1;
3074                 }
3075         }
3076
3077         if ((intent && rc == ELDLM_LOCK_ABORTED) || !rc) {
3078                 CDEBUG(D_INODE,"got kms "LPU64" blocks "LPU64" mtime "LPU64"\n",
3079                        oinfo->oi_md->lsm_oinfo[0]->loi_lvb.lvb_size,
3080                        oinfo->oi_md->lsm_oinfo[0]->loi_lvb.lvb_blocks,
3081                        oinfo->oi_md->lsm_oinfo[0]->loi_lvb.lvb_mtime);
3082         }
3083
3084         if (!rc)
3085                 cache_add_lock(obd->u.cli.cl_cache, oinfo->oi_lockh);
3086
3087         /* Call the update callback. */
3088         rc = oinfo->oi_cb_up(oinfo, rc);
3089         RETURN(rc);
3090 }
3091
3092 static int osc_enqueue_interpret(struct ptlrpc_request *req,
3093                                  struct osc_enqueue_args *aa, int rc)
3094 {
3095         int intent = aa->oa_oi->oi_flags & LDLM_FL_HAS_INTENT;
3096         struct lov_stripe_md *lsm = aa->oa_oi->oi_md;
3097         struct ldlm_lock *lock;
3098
3099         /* ldlm_cli_enqueue is holding a reference on the lock, so it must
3100          * be valid. */
3101         lock = ldlm_handle2lock(aa->oa_oi->oi_lockh);
3102
3103         /* Complete obtaining the lock procedure. */
3104         rc = ldlm_cli_enqueue_fini(aa->oa_exp, req, aa->oa_ei->ei_type, 1,
3105                                    aa->oa_ei->ei_mode,
3106                                    &aa->oa_oi->oi_flags,
3107                                    &lsm->lsm_oinfo[0]->loi_lvb,
3108                                    sizeof(lsm->lsm_oinfo[0]->loi_lvb),
3109                                    lustre_swab_ost_lvb,
3110                                    aa->oa_oi->oi_lockh, rc);
3111
3112         /* Complete osc stuff. */
3113         rc = osc_enqueue_fini(aa->oa_exp->exp_obd, req, aa->oa_oi, intent, rc);
3114
3115         /* Release the lock for async request. */
3116         if (lustre_handle_is_used(aa->oa_oi->oi_lockh) && rc == ELDLM_OK)
3117                 ldlm_lock_decref(aa->oa_oi->oi_lockh, aa->oa_ei->ei_mode);
3118
3119         LASSERTF(lock != NULL, "lockh %p, req %p, aa %p - client evicted?\n",
3120                  aa->oa_oi->oi_lockh, req, aa);
3121         LDLM_LOCK_PUT(lock);
3122         return rc;
3123 }
3124
3125 /* When enqueuing asynchronously, locks are not ordered, we can obtain a lock
3126  * from the 2nd OSC before a lock from the 1st one. This does not deadlock with
3127  * other synchronous requests, however keeping some locks and trying to obtain
3128  * others may take a considerable amount of time in a case of ost failure; and
3129  * when other sync requests do not get released lock from a client, the client
3130  * is excluded from the cluster -- such scenarious make the life difficult, so
3131  * release locks just after they are obtained. */
3132 static int osc_enqueue(struct obd_export *exp, struct obd_info *oinfo,
3133                        struct ldlm_enqueue_info *einfo,
3134                        struct ptlrpc_request_set *rqset)
3135 {
3136         struct ldlm_res_id res_id;
3137         struct obd_device *obd = exp->exp_obd;
3138         struct ptlrpc_request *req = NULL;
3139         int intent = oinfo->oi_flags & LDLM_FL_HAS_INTENT;
3140         ldlm_mode_t mode;
3141         int rc;
3142         ENTRY;
3143
3144
3145         osc_build_res_name(oinfo->oi_md->lsm_object_id,
3146                            oinfo->oi_md->lsm_object_gr, &res_id);
3147         /* Filesystem lock extents are extended to page boundaries so that
3148          * dealing with the page cache is a little smoother.  */
3149         oinfo->oi_policy.l_extent.start -=
3150                 oinfo->oi_policy.l_extent.start & ~CFS_PAGE_MASK;
3151         oinfo->oi_policy.l_extent.end |= ~CFS_PAGE_MASK;
3152
3153         if (oinfo->oi_md->lsm_oinfo[0]->loi_kms_valid == 0)
3154                 goto no_match;
3155
3156         /* Next, search for already existing extent locks that will cover us */
3157         /* If we're trying to read, we also search for an existing PW lock.  The
3158          * VFS and page cache already protect us locally, so lots of readers/
3159          * writers can share a single PW lock.
3160          *
3161          * There are problems with conversion deadlocks, so instead of
3162          * converting a read lock to a write lock, we'll just enqueue a new
3163          * one.
3164          *
3165          * At some point we should cancel the read lock instead of making them
3166          * send us a blocking callback, but there are problems with canceling
3167          * locks out from other users right now, too. */
3168         mode = einfo->ei_mode;
3169         if (einfo->ei_mode == LCK_PR)
3170                 mode |= LCK_PW;
3171         mode = ldlm_lock_match(obd->obd_namespace,
3172                                oinfo->oi_flags | LDLM_FL_LVB_READY, &res_id,
3173                                einfo->ei_type, &oinfo->oi_policy, mode,
3174                                oinfo->oi_lockh);
3175         if (mode) {
3176                 /* addref the lock only if not async requests and PW lock is
3177                  * matched whereas we asked for PR. */
3178                 if (!rqset && einfo->ei_mode != mode)
3179                         ldlm_lock_addref(oinfo->oi_lockh, LCK_PR);
3180                 osc_set_data_with_check(oinfo->oi_lockh, einfo->ei_cbdata,
3181                                         oinfo->oi_flags);
3182                 if (intent) {
3183                         /* I would like to be able to ASSERT here that rss <=
3184                          * kms, but I can't, for reasons which are explained in
3185                          * lov_enqueue() */
3186                 }
3187
3188                 /* We already have a lock, and it's referenced */
3189                 oinfo->oi_cb_up(oinfo, ELDLM_OK);
3190
3191                 /* For async requests, decref the lock. */
3192                 if (einfo->ei_mode != mode)
3193                         ldlm_lock_decref(oinfo->oi_lockh, LCK_PW);
3194                 else if (rqset)
3195                         ldlm_lock_decref(oinfo->oi_lockh, einfo->ei_mode);
3196
3197                 RETURN(ELDLM_OK);
3198         }
3199
3200  no_match:
3201         if (intent) {
3202                 CFS_LIST_HEAD(cancels);
3203                 req = ptlrpc_request_alloc(class_exp2cliimp(exp),
3204                                            &RQF_LDLM_ENQUEUE_LVB);
3205                 if (req == NULL)
3206                         RETURN(-ENOMEM);
3207
3208                 rc = ldlm_prep_enqueue_req(exp, req, &cancels, 0);
3209                 if (rc)
3210                         RETURN(rc);
3211
3212                 req_capsule_set_size(&req->rq_pill, &RMF_DLM_LVB, RCL_SERVER,
3213                                      sizeof(oinfo->oi_md->lsm_oinfo[0]->loi_lvb));
3214                 ptlrpc_request_set_replen(req);
3215         }
3216
3217         /* users of osc_enqueue() can pass this flag for ldlm_lock_match() */
3218         oinfo->oi_flags &= ~LDLM_FL_BLOCK_GRANTED;
3219
3220         rc = ldlm_cli_enqueue(exp, &req, einfo, &res_id,
3221                               &oinfo->oi_policy, &oinfo->oi_flags,
3222                               &oinfo->oi_md->lsm_oinfo[0]->loi_lvb,
3223                               sizeof(oinfo->oi_md->lsm_oinfo[0]->loi_lvb),
3224                               lustre_swab_ost_lvb, oinfo->oi_lockh,
3225                               rqset ? 1 : 0);
3226         if (rqset) {
3227                 if (!rc) {
3228                         struct osc_enqueue_args *aa;
3229                         CLASSERT (sizeof(*aa) <= sizeof(req->rq_async_args));
3230                         aa = ptlrpc_req_async_args(req);
3231                         aa->oa_oi = oinfo;
3232                         aa->oa_ei = einfo;
3233                         aa->oa_exp = exp;
3234
3235                         req->rq_interpret_reply = osc_enqueue_interpret;
3236                         ptlrpc_set_add_req(rqset, req);
3237                 } else if (intent) {
3238                         ptlrpc_req_finished(req);
3239                 }
3240                 RETURN(rc);
3241         }
3242
3243         rc = osc_enqueue_fini(obd, req, oinfo, intent, rc);
3244         if (intent)
3245                 ptlrpc_req_finished(req);
3246
3247         RETURN(rc);
3248 }
3249
3250 static int osc_match(struct obd_export *exp, struct lov_stripe_md *lsm,
3251                      __u32 type, ldlm_policy_data_t *policy, __u32 mode,
3252                      int *flags, void *data, struct lustre_handle *lockh)
3253 {
3254         struct ldlm_res_id res_id;
3255         struct obd_device *obd = exp->exp_obd;
3256         int lflags = *flags;
3257         ldlm_mode_t rc;
3258         ENTRY;
3259
3260         osc_build_res_name(lsm->lsm_object_id, lsm->lsm_object_gr, &res_id);
3261
3262         if (OBD_FAIL_CHECK(OBD_FAIL_OSC_MATCH))
3263                 RETURN(-EIO);
3264
3265         /* Filesystem lock extents are extended to page boundaries so that
3266          * dealing with the page cache is a little smoother */
3267         policy->l_extent.start -= policy->l_extent.start & ~CFS_PAGE_MASK;
3268         policy->l_extent.end |= ~CFS_PAGE_MASK;
3269
3270         /* Next, search for already existing extent locks that will cover us */
3271         /* If we're trying to read, we also search for an existing PW lock.  The
3272          * VFS and page cache already protect us locally, so lots of readers/
3273          * writers can share a single PW lock. */
3274         rc = mode;
3275         if (mode == LCK_PR)
3276                 rc |= LCK_PW;
3277         rc = ldlm_lock_match(obd->obd_namespace, lflags | LDLM_FL_LVB_READY,
3278                              &res_id, type, policy, rc, lockh);
3279         if (rc) {
3280                 osc_set_data_with_check(lockh, data, lflags);
3281                 if (!(lflags & LDLM_FL_TEST_LOCK) && mode != rc) {
3282                         ldlm_lock_addref(lockh, LCK_PR);
3283                         ldlm_lock_decref(lockh, LCK_PW);
3284                 }
3285                 RETURN(rc);
3286         }
3287         RETURN(rc);
3288 }
3289
3290 static int osc_cancel(struct obd_export *exp, struct lov_stripe_md *md,
3291                       __u32 mode, struct lustre_handle *lockh)
3292 {
3293         ENTRY;
3294
3295         if (unlikely(mode == LCK_GROUP))
3296                 ldlm_lock_decref_and_cancel(lockh, mode);
3297         else
3298                 ldlm_lock_decref(lockh, mode);
3299
3300         RETURN(0);
3301 }
3302
3303 static int osc_cancel_unused(struct obd_export *exp,
3304                              struct lov_stripe_md *lsm, int flags,
3305                              void *opaque)
3306 {
3307         struct obd_device *obd = class_exp2obd(exp);
3308         struct ldlm_res_id res_id, *resp = NULL;
3309
3310         if (lsm != NULL) {
3311                 resp = osc_build_res_name(lsm->lsm_object_id,
3312                                           lsm->lsm_object_gr, &res_id);
3313         }
3314
3315         return ldlm_cli_cancel_unused(obd->obd_namespace, resp, flags, opaque);
3316 }
3317
3318 static int osc_join_lru(struct obd_export *exp,
3319                         struct lov_stripe_md *lsm, int join)
3320 {
3321         struct obd_device *obd = class_exp2obd(exp);
3322         struct ldlm_res_id res_id, *resp = NULL;
3323
3324         if (lsm != NULL) {
3325                 resp = osc_build_res_name(lsm->lsm_object_id,
3326                                           lsm->lsm_object_gr, &res_id);
3327         }
3328
3329         return ldlm_cli_join_lru(obd->obd_namespace, resp, join);
3330 }
3331
3332 static int osc_statfs_interpret(struct ptlrpc_request *req,
3333                                 struct osc_async_args *aa, int rc)
3334 {
3335         struct obd_statfs *msfs;
3336         ENTRY;
3337
3338         if (rc != 0)
3339                 GOTO(out, rc);
3340
3341         msfs = req_capsule_server_get(&req->rq_pill, &RMF_OBD_STATFS);
3342         if (msfs == NULL) {
3343                 GOTO(out, rc = -EPROTO);
3344         }
3345
3346         *aa->aa_oi->oi_osfs = *msfs;
3347 out:
3348         rc = aa->aa_oi->oi_cb_up(aa->aa_oi, rc);
3349         RETURN(rc);
3350 }
3351
3352 static int osc_statfs_async(struct obd_device *obd, struct obd_info *oinfo,
3353                             __u64 max_age, struct ptlrpc_request_set *rqset)
3354 {
3355         struct ptlrpc_request *req;
3356         struct osc_async_args *aa;
3357         int                    rc;
3358         ENTRY;
3359
3360         /* We could possibly pass max_age in the request (as an absolute
3361          * timestamp or a "seconds.usec ago") so the target can avoid doing
3362          * extra calls into the filesystem if that isn't necessary (e.g.
3363          * during mount that would help a bit).  Having relative timestamps
3364          * is not so great if request processing is slow, while absolute
3365          * timestamps are not ideal because they need time synchronization. */
3366         req = ptlrpc_request_alloc(obd->u.cli.cl_import, &RQF_OST_STATFS);
3367         if (req == NULL)
3368                 RETURN(-ENOMEM);
3369
3370         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_STATFS);
3371         if (rc) {
3372                 ptlrpc_request_free(req);
3373                 RETURN(rc);
3374         }
3375         ptlrpc_request_set_replen(req);
3376         req->rq_request_portal = OST_CREATE_PORTAL;
3377         ptlrpc_at_set_req_timeout(req);
3378
3379         if (oinfo->oi_flags & OBD_STATFS_NODELAY) {
3380                 /* procfs requests not want stat in wait for avoid deadlock */
3381                 req->rq_no_resend = 1;
3382                 req->rq_no_delay = 1;
3383         }
3384
3385         req->rq_interpret_reply = osc_statfs_interpret;
3386         CLASSERT (sizeof(*aa) <= sizeof(req->rq_async_args));
3387         aa = ptlrpc_req_async_args(req);
3388         aa->aa_oi = oinfo;
3389
3390         ptlrpc_set_add_req(rqset, req);
3391         RETURN(0);
3392 }
3393
3394 static int osc_statfs(struct obd_device *obd, struct obd_statfs *osfs,
3395                       __u64 max_age, __u32 flags)
3396 {
3397         struct obd_statfs     *msfs;
3398         struct ptlrpc_request *req;
3399         struct obd_import     *imp = NULL;
3400         int rc;
3401         ENTRY;
3402
3403         /*Since the request might also come from lprocfs, so we need
3404          *sync this with client_disconnect_export Bug15684*/
3405         down_read(&obd->u.cli.cl_sem);
3406         if (obd->u.cli.cl_import)
3407                 imp = class_import_get(obd->u.cli.cl_import);
3408         up_read(&obd->u.cli.cl_sem);
3409         if (!imp)
3410                 RETURN(-ENODEV);
3411
3412         /* We could possibly pass max_age in the request (as an absolute
3413          * timestamp or a "seconds.usec ago") so the target can avoid doing
3414          * extra calls into the filesystem if that isn't necessary (e.g.
3415          * during mount that would help a bit).  Having relative timestamps
3416          * is not so great if request processing is slow, while absolute
3417          * timestamps are not ideal because they need time synchronization. */
3418         req = ptlrpc_request_alloc(imp, &RQF_OST_STATFS);
3419
3420         class_import_put(imp);
3421
3422         if (req == NULL)
3423                 RETURN(-ENOMEM);
3424
3425         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_STATFS);
3426         if (rc) {
3427                 ptlrpc_request_free(req);
3428                 RETURN(rc);
3429         }
3430         ptlrpc_request_set_replen(req);
3431         req->rq_request_portal = OST_CREATE_PORTAL;
3432         ptlrpc_at_set_req_timeout(req);
3433
3434         if (flags & OBD_STATFS_NODELAY) {
3435                 /* procfs requests not want stat in wait for avoid deadlock */
3436                 req->rq_no_resend = 1;
3437                 req->rq_no_delay = 1;
3438         }
3439
3440         rc = ptlrpc_queue_wait(req);
3441         if (rc)
3442                 GOTO(out, rc);
3443
3444         msfs = req_capsule_server_get(&req->rq_pill, &RMF_OBD_STATFS);
3445         if (msfs == NULL) {
3446                 GOTO(out, rc = -EPROTO);
3447         }
3448
3449         *osfs = *msfs;
3450
3451         EXIT;
3452  out:
3453         ptlrpc_req_finished(req);
3454         return rc;
3455 }
3456
3457 /* Retrieve object striping information.
3458  *
3459  * @lmmu is a pointer to an in-core struct with lmm_ost_count indicating
3460  * the maximum number of OST indices which will fit in the user buffer.
3461  * lmm_magic must be LOV_MAGIC (we only use 1 slot here).
3462  */
3463 static int osc_getstripe(struct lov_stripe_md *lsm, struct lov_user_md *lump)
3464 {
3465         struct lov_user_md lum, *lumk;
3466         int rc = 0, lum_size;
3467         ENTRY;
3468
3469         if (!lsm)
3470                 RETURN(-ENODATA);
3471
3472         if (copy_from_user(&lum, lump, sizeof(lum)))
3473                 RETURN(-EFAULT);
3474
3475         if (lum.lmm_magic != LOV_USER_MAGIC)
3476                 RETURN(-EINVAL);
3477
3478         if (lum.lmm_stripe_count > 0) {
3479                 lum_size = sizeof(lum) + sizeof(lum.lmm_objects[0]);
3480                 OBD_ALLOC(lumk, lum_size);
3481                 if (!lumk)
3482                         RETURN(-ENOMEM);
3483
3484                 lumk->lmm_objects[0].l_object_id = lsm->lsm_object_id;
3485                 lumk->lmm_objects[0].l_object_gr = lsm->lsm_object_gr;
3486         } else {
3487                 lum_size = sizeof(lum);
3488                 lumk = &lum;
3489         }
3490
3491         lumk->lmm_object_id = lsm->lsm_object_id;
3492         lumk->lmm_object_gr = lsm->lsm_object_gr;
3493         lumk->lmm_stripe_count = 1;
3494
3495         if (copy_to_user(lump, lumk, lum_size))
3496                 rc = -EFAULT;
3497
3498         if (lumk != &lum)
3499                 OBD_FREE(lumk, lum_size);
3500
3501         RETURN(rc);
3502 }
3503
3504
3505 static int osc_iocontrol(unsigned int cmd, struct obd_export *exp, int len,
3506                          void *karg, void *uarg)
3507 {
3508         struct obd_device *obd = exp->exp_obd;
3509         struct obd_ioctl_data *data = karg;
3510         int err = 0;
3511         ENTRY;
3512
3513         if (!try_module_get(THIS_MODULE)) {
3514                 CERROR("Can't get module. Is it alive?");
3515                 return -EINVAL;
3516         }
3517         switch (cmd) {
3518         case OBD_IOC_LOV_GET_CONFIG: {
3519                 char *buf;
3520                 struct lov_desc *desc;
3521                 struct obd_uuid uuid;
3522
3523                 buf = NULL;
3524                 len = 0;
3525                 if (obd_ioctl_getdata(&buf, &len, (void *)uarg))
3526                         GOTO(out, err = -EINVAL);
3527
3528                 data = (struct obd_ioctl_data *)buf;
3529
3530                 if (sizeof(*desc) > data->ioc_inllen1) {
3531                         obd_ioctl_freedata(buf, len);
3532                         GOTO(out, err = -EINVAL);
3533                 }
3534
3535                 if (data->ioc_inllen2 < sizeof(uuid)) {
3536                         obd_ioctl_freedata(buf, len);
3537                         GOTO(out, err = -EINVAL);
3538                 }
3539
3540                 desc = (struct lov_desc *)data->ioc_inlbuf1;
3541                 desc->ld_tgt_count = 1;
3542                 desc->ld_active_tgt_count = 1;
3543                 desc->ld_default_stripe_count = 1;
3544                 desc->ld_default_stripe_size = 0;
3545                 desc->ld_default_stripe_offset = 0;
3546                 desc->ld_pattern = 0;
3547                 memcpy(&desc->ld_uuid, &obd->obd_uuid, sizeof(uuid));
3548
3549                 memcpy(data->ioc_inlbuf2, &obd->obd_uuid, sizeof(uuid));
3550
3551                 err = copy_to_user((void *)uarg, buf, len);
3552                 if (err)
3553                         err = -EFAULT;
3554                 obd_ioctl_freedata(buf, len);
3555                 GOTO(out, err);
3556         }
3557         case LL_IOC_LOV_SETSTRIPE:
3558                 err = obd_alloc_memmd(exp, karg);
3559                 if (err > 0)
3560                         err = 0;
3561                 GOTO(out, err);
3562         case LL_IOC_LOV_GETSTRIPE:
3563                 err = osc_getstripe(karg, uarg);
3564                 GOTO(out, err);
3565         case OBD_IOC_CLIENT_RECOVER:
3566                 err = ptlrpc_recover_import(obd->u.cli.cl_import,
3567                                             data->ioc_inlbuf1);
3568                 if (err > 0)
3569                         err = 0;
3570                 GOTO(out, err);
3571         case IOC_OSC_SET_ACTIVE:
3572                 err = ptlrpc_set_import_active(obd->u.cli.cl_import,
3573                                                data->ioc_offset);
3574                 GOTO(out, err);
3575         case OBD_IOC_POLL_QUOTACHECK:
3576                 err = lquota_poll_check(quota_interface, exp,
3577                                         (struct if_quotacheck *)karg);
3578                 GOTO(out, err);
3579         default:
3580                 CDEBUG(D_INODE, "unrecognised ioctl %#x by %s\n",
3581                        cmd, cfs_curproc_comm());
3582                 GOTO(out, err = -ENOTTY);
3583         }
3584 out:
3585         module_put(THIS_MODULE);
3586         return err;
3587 }
3588
3589 static int osc_get_info(struct obd_export *exp, obd_count keylen,
3590                         void *key, __u32 *vallen, void *val,
3591                         struct lov_stripe_md *lsm)
3592 {
3593         ENTRY;
3594         if (!vallen || !val)
3595                 RETURN(-EFAULT);
3596
3597         if (KEY_IS(KEY_LOCK_TO_STRIPE)) {
3598                 __u32 *stripe = val;
3599                 *vallen = sizeof(*stripe);
3600                 *stripe = 0;
3601                 RETURN(0);
3602         } else if (KEY_IS(KEY_LAST_ID)) {
3603                 struct ptlrpc_request *req;
3604                 obd_id                *reply;
3605                 char                  *tmp;
3606                 int                    rc;
3607
3608                 req = ptlrpc_request_alloc(class_exp2cliimp(exp),
3609                                            &RQF_OST_GET_INFO_LAST_ID);
3610                 if (req == NULL)
3611                         RETURN(-ENOMEM);
3612
3613                 req_capsule_set_size(&req->rq_pill, &RMF_SETINFO_KEY,
3614                                      RCL_CLIENT, keylen);
3615                 rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_GET_INFO);
3616                 if (rc) {
3617                         ptlrpc_request_free(req);
3618                         RETURN(rc);
3619                 }
3620
3621                 tmp = req_capsule_client_get(&req->rq_pill, &RMF_SETINFO_KEY);
3622                 memcpy(tmp, key, keylen);
3623
3624                 ptlrpc_request_set_replen(req);
3625                 rc = ptlrpc_queue_wait(req);
3626                 if (rc)
3627                         GOTO(out, rc);
3628
3629                 reply = req_capsule_server_get(&req->rq_pill, &RMF_OBD_ID);
3630                 if (reply == NULL)
3631                         GOTO(out, rc = -EPROTO);
3632
3633                 *((obd_id *)val) = *reply;
3634         out:
3635                 ptlrpc_req_finished(req);
3636                 RETURN(rc);
3637         } else if (KEY_IS(KEY_FIEMAP)) {
3638                 struct ptlrpc_request *req;
3639                 struct ll_user_fiemap *reply;
3640                 char *tmp;
3641                 int rc;
3642
3643                 req = ptlrpc_request_alloc(class_exp2cliimp(exp),
3644                                            &RQF_OST_GET_INFO_FIEMAP);
3645                 if (req == NULL)
3646                         RETURN(-ENOMEM);
3647
3648                 req_capsule_set_size(&req->rq_pill, &RMF_FIEMAP_KEY,
3649                                      RCL_CLIENT, keylen);
3650                 req_capsule_set_size(&req->rq_pill, &RMF_FIEMAP_VAL,
3651                                      RCL_CLIENT, *vallen);
3652                 req_capsule_set_size(&req->rq_pill, &RMF_FIEMAP_VAL,
3653                                      RCL_SERVER, *vallen);
3654
3655                 rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_GET_INFO);
3656                 if (rc) {
3657                         ptlrpc_request_free(req);
3658                         RETURN(rc);
3659                 }
3660
3661                 tmp = req_capsule_client_get(&req->rq_pill, &RMF_FIEMAP_KEY);
3662                 memcpy(tmp, key, keylen);
3663                 tmp = req_capsule_client_get(&req->rq_pill, &RMF_FIEMAP_VAL);
3664                 memcpy(tmp, val, *vallen);
3665
3666                 ptlrpc_request_set_replen(req);
3667                 rc = ptlrpc_queue_wait(req);
3668                 if (rc)
3669                         GOTO(out1, rc);
3670
3671                 reply = req_capsule_server_get(&req->rq_pill, &RMF_FIEMAP_VAL);
3672                 if (reply == NULL)
3673                         GOTO(out1, rc = -EPROTO);
3674
3675                 memcpy(val, reply, *vallen);
3676         out1:
3677                 ptlrpc_req_finished(req);
3678
3679                 RETURN(rc);
3680         }
3681
3682         RETURN(-EINVAL);
3683 }
3684
3685 static int osc_setinfo_mds_conn_interpret(struct ptlrpc_request *req,
3686                                           void *aa, int rc)
3687 {
3688         struct llog_ctxt *ctxt;
3689         struct obd_import *imp = req->rq_import;
3690         ENTRY;
3691
3692         if (rc != 0)
3693                 RETURN(rc);
3694
3695         ctxt = llog_get_context(imp->imp_obd, LLOG_MDS_OST_ORIG_CTXT);
3696         if (ctxt) {
3697                 if (rc == 0)
3698                         rc = llog_initiator_connect(ctxt);
3699                 else
3700                         CERROR("cannot establish connection for "
3701                                "ctxt %p: %d\n", ctxt, rc);
3702         }
3703
3704         llog_ctxt_put(ctxt);
3705         spin_lock(&imp->imp_lock);
3706         imp->imp_server_timeout = 1;
3707         imp->imp_pingable = 1;
3708         spin_unlock(&imp->imp_lock);
3709         CDEBUG(D_RPCTRACE, "pinging OST %s\n", obd2cli_tgt(imp->imp_obd));
3710
3711         RETURN(rc);
3712 }
3713
3714 static int osc_set_info_async(struct obd_export *exp, obd_count keylen,
3715                               void *key, obd_count vallen, void *val,
3716                               struct ptlrpc_request_set *set)
3717 {
3718         struct ptlrpc_request *req;
3719         struct obd_device     *obd = exp->exp_obd;
3720         struct obd_import     *imp = class_exp2cliimp(exp);
3721         char                  *tmp;
3722         int                    rc;
3723         ENTRY;
3724
3725         OBD_FAIL_TIMEOUT(OBD_FAIL_OSC_SHUTDOWN, 10);
3726
3727         if (KEY_IS(KEY_NEXT_ID)) {
3728                 if (vallen != sizeof(obd_id))
3729                         RETURN(-ERANGE);
3730                 if (val == NULL)
3731                         RETURN(-EINVAL);
3732                 obd->u.cli.cl_oscc.oscc_next_id = *((obd_id*)val) + 1;
3733                 CDEBUG(D_HA, "%s: set oscc_next_id = "LPU64"\n",
3734                        exp->exp_obd->obd_name,
3735                        obd->u.cli.cl_oscc.oscc_next_id);
3736
3737                 RETURN(0);
3738         }
3739
3740         if (KEY_IS(KEY_UNLINKED)) {
3741                 struct osc_creator *oscc = &obd->u.cli.cl_oscc;
3742                 spin_lock(&oscc->oscc_lock);
3743                 oscc->oscc_flags &= ~OSCC_FLAG_NOSPC;
3744                 spin_unlock(&oscc->oscc_lock);
3745                 RETURN(0);
3746         }
3747
3748         if (KEY_IS(KEY_INIT_RECOV)) {
3749                 if (vallen != sizeof(int))
3750                         RETURN(-EINVAL);
3751                 spin_lock(&imp->imp_lock);
3752                 imp->imp_initial_recov = *(int *)val;
3753                 spin_unlock(&imp->imp_lock);
3754                 CDEBUG(D_HA, "%s: set imp_initial_recov = %d\n",
3755                        exp->exp_obd->obd_name,
3756                        imp->imp_initial_recov);
3757                 RETURN(0);
3758         }
3759
3760         if (KEY_IS(KEY_CHECKSUM)) {
3761                 if (vallen != sizeof(int))
3762                         RETURN(-EINVAL);
3763                 exp->exp_obd->u.cli.cl_checksum = (*(int *)val) ? 1 : 0;
3764                 RETURN(0);
3765         }
3766
3767         if (KEY_IS(KEY_FLUSH_CTX)) {
3768                 sptlrpc_import_flush_my_ctx(imp);
3769                 RETURN(0);
3770         }
3771
3772         if (!set)
3773                 RETURN(-EINVAL);
3774
3775         /* We pass all other commands directly to OST. Since nobody calls osc
3776            methods directly and everybody is supposed to go through LOV, we
3777            assume lov checked invalid values for us.
3778            The only recognised values so far are evict_by_nid and mds_conn.
3779            Even if something bad goes through, we'd get a -EINVAL from OST
3780            anyway. */
3781
3782
3783         req = ptlrpc_request_alloc(imp, &RQF_OST_SET_INFO);
3784         if (req == NULL)
3785                 RETURN(-ENOMEM);
3786
3787         req_capsule_set_size(&req->rq_pill, &RMF_SETINFO_KEY,
3788                              RCL_CLIENT, keylen);
3789         req_capsule_set_size(&req->rq_pill, &RMF_SETINFO_VAL,
3790                              RCL_CLIENT, vallen);
3791         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_SET_INFO);
3792         if (rc) {
3793                 ptlrpc_request_free(req);
3794                 RETURN(rc);
3795         }
3796
3797         tmp = req_capsule_client_get(&req->rq_pill, &RMF_SETINFO_KEY);
3798         memcpy(tmp, key, keylen);
3799         tmp = req_capsule_client_get(&req->rq_pill, &RMF_SETINFO_VAL);
3800         memcpy(tmp, val, vallen);
3801
3802         if (KEY_IS(KEY_MDS_CONN)) {
3803                 struct osc_creator *oscc = &obd->u.cli.cl_oscc;
3804
3805                 oscc->oscc_oa.o_gr = (*(__u32 *)val);
3806                 oscc->oscc_oa.o_valid |= OBD_MD_FLGROUP;
3807                 LASSERT(oscc->oscc_oa.o_gr > 0);
3808                 req->rq_interpret_reply = osc_setinfo_mds_conn_interpret;
3809         }
3810
3811         ptlrpc_request_set_replen(req);
3812         ptlrpc_set_add_req(set, req);
3813         ptlrpc_check_set(set);
3814
3815         RETURN(0);
3816 }
3817
3818
3819 static struct llog_operations osc_size_repl_logops = {
3820         lop_cancel: llog_obd_repl_cancel
3821 };
3822
3823 static struct llog_operations osc_mds_ost_orig_logops;
3824 static int osc_llog_init(struct obd_device *obd, struct obd_llog_group *olg,
3825                          struct obd_device *tgt, int count,
3826                          struct llog_catid *catid, struct obd_uuid *uuid)
3827 {
3828         int rc;
3829         ENTRY;
3830
3831         LASSERT(olg == &obd->obd_olg);
3832         spin_lock(&obd->obd_dev_lock);
3833         if (osc_mds_ost_orig_logops.lop_setup != llog_obd_origin_setup) {
3834                 osc_mds_ost_orig_logops = llog_lvfs_ops;
3835                 osc_mds_ost_orig_logops.lop_setup = llog_obd_origin_setup;
3836                 osc_mds_ost_orig_logops.lop_cleanup = llog_obd_origin_cleanup;
3837                 osc_mds_ost_orig_logops.lop_add = llog_obd_origin_add;
3838                 osc_mds_ost_orig_logops.lop_connect = llog_origin_connect;
3839         }
3840         spin_unlock(&obd->obd_dev_lock);
3841
3842         rc = llog_setup(obd, &obd->obd_olg, LLOG_MDS_OST_ORIG_CTXT, tgt, count,
3843                         &catid->lci_logid, &osc_mds_ost_orig_logops);
3844         if (rc) {
3845                 CERROR("failed LLOG_MDS_OST_ORIG_CTXT\n");
3846                 GOTO (out, rc);
3847         }
3848
3849         rc = llog_setup(obd, &obd->obd_olg, LLOG_SIZE_REPL_CTXT, tgt, count,
3850                         NULL, &osc_size_repl_logops);
3851         if (rc)
3852                 CERROR("failed LLOG_SIZE_REPL_CTXT\n");
3853 out:
3854         if (rc) {
3855                 CERROR("osc '%s' tgt '%s' cnt %d catid %p rc=%d\n",
3856                        obd->obd_name, tgt->obd_name, count, catid, rc);
3857                 CERROR("logid "LPX64":0x%x\n",
3858                        catid->lci_logid.lgl_oid, catid->lci_logid.lgl_ogen);
3859         }
3860         RETURN(rc);
3861 }
3862
3863 static int osc_llog_finish(struct obd_device *obd, int count)
3864 {
3865         struct llog_ctxt *ctxt;
3866         int rc = 0, rc2 = 0;
3867         ENTRY;
3868
3869         ctxt = llog_get_context(obd, LLOG_MDS_OST_ORIG_CTXT);
3870         if (ctxt)
3871                 rc = llog_cleanup(ctxt);
3872
3873         ctxt = llog_get_context(obd, LLOG_SIZE_REPL_CTXT);
3874         if (ctxt)
3875                 rc2 = llog_cleanup(ctxt);
3876         if (!rc)
3877                 rc = rc2;
3878
3879         RETURN(rc);
3880 }
3881
3882 static int osc_reconnect(const struct lu_env *env,
3883                          struct obd_export *exp, struct obd_device *obd,
3884                          struct obd_uuid *cluuid,
3885                          struct obd_connect_data *data)
3886 {
3887         struct client_obd *cli = &obd->u.cli;
3888
3889         if (data != NULL && (data->ocd_connect_flags & OBD_CONNECT_GRANT)) {
3890                 long lost_grant;
3891
3892                 client_obd_list_lock(&cli->cl_loi_list_lock);
3893                 data->ocd_grant = cli->cl_avail_grant ?:
3894                                 2 * cli->cl_max_pages_per_rpc << CFS_PAGE_SHIFT;
3895                 lost_grant = cli->cl_lost_grant;
3896                 cli->cl_lost_grant = 0;
3897                 client_obd_list_unlock(&cli->cl_loi_list_lock);
3898
3899                 CDEBUG(D_CACHE, "request ocd_grant: %d cl_avail_grant: %ld "
3900                        "cl_lost_grant: %ld\n", data->ocd_grant,
3901                        cli->cl_avail_grant, lost_grant);
3902                 CDEBUG(D_RPCTRACE, "ocd_connect_flags: "LPX64" ocd_version: %d"
3903                        " ocd_grant: %d\n", data->ocd_connect_flags,
3904                        data->ocd_version, data->ocd_grant);
3905         }
3906
3907         RETURN(0);
3908 }
3909
3910 static int osc_disconnect(struct obd_export *exp)
3911 {
3912         struct obd_device *obd = class_exp2obd(exp);
3913         struct llog_ctxt *ctxt = llog_get_context(obd, LLOG_SIZE_REPL_CTXT);
3914         int rc;
3915
3916         if (obd->u.cli.cl_conn_count == 1)
3917                 /* flush any remaining cancel messages out to the target */
3918                 llog_sync(ctxt, exp);
3919
3920         llog_ctxt_put(ctxt);
3921
3922         rc = client_disconnect_export(exp);
3923         return rc;
3924 }
3925
3926 static int osc_import_event(struct obd_device *obd,
3927                             struct obd_import *imp,
3928                             enum obd_import_event event)
3929 {
3930         struct client_obd *cli;
3931         int rc = 0;
3932
3933         ENTRY;
3934         LASSERT(imp->imp_obd == obd);
3935
3936         switch (event) {
3937         case IMP_EVENT_DISCON: {
3938                 /* Only do this on the MDS OSC's */
3939                 if (imp->imp_server_timeout) {
3940                         struct osc_creator *oscc = &obd->u.cli.cl_oscc;
3941
3942                         spin_lock(&oscc->oscc_lock);
3943                         oscc->oscc_flags |= OSCC_FLAG_RECOVERING;
3944                         spin_unlock(&oscc->oscc_lock);
3945                 }
3946                 cli = &obd->u.cli;
3947                 client_obd_list_lock(&cli->cl_loi_list_lock);
3948                 cli->cl_avail_grant = 0;
3949                 cli->cl_lost_grant = 0;
3950                 client_obd_list_unlock(&cli->cl_loi_list_lock);
3951                 break;
3952         }
3953         case IMP_EVENT_INACTIVE: {
3954                 rc = obd_notify_observer(obd, obd, OBD_NOTIFY_INACTIVE, NULL);
3955                 break;
3956         }
3957         case IMP_EVENT_INVALIDATE: {
3958                 struct ldlm_namespace *ns = obd->obd_namespace;
3959
3960                 /* Reset grants */
3961                 cli = &obd->u.cli;
3962                 client_obd_list_lock(&cli->cl_loi_list_lock);
3963                 /* all pages go to failing rpcs due to the invalid import */
3964                 osc_check_rpcs(cli);
3965                 client_obd_list_unlock(&cli->cl_loi_list_lock);
3966
3967                 ldlm_namespace_cleanup(ns, LDLM_FL_LOCAL_ONLY);
3968
3969                 break;
3970         }
3971         case IMP_EVENT_ACTIVE: {
3972                 /* Only do this on the MDS OSC's */
3973                 if (imp->imp_server_timeout) {
3974                         struct osc_creator *oscc = &obd->u.cli.cl_oscc;
3975
3976                         spin_lock(&oscc->oscc_lock);
3977                         oscc->oscc_flags &= ~OSCC_FLAG_NOSPC;
3978                         spin_unlock(&oscc->oscc_lock);
3979                 }
3980                 rc = obd_notify_observer(obd, obd, OBD_NOTIFY_ACTIVE, NULL);
3981                 break;
3982         }
3983         case IMP_EVENT_OCD: {
3984                 struct obd_connect_data *ocd = &imp->imp_connect_data;
3985
3986                 if (ocd->ocd_connect_flags & OBD_CONNECT_GRANT)
3987                         osc_init_grant(&obd->u.cli, ocd);
3988
3989                 /* See bug 7198 */
3990                 if (ocd->ocd_connect_flags & OBD_CONNECT_REQPORTAL)
3991                         imp->imp_client->cli_request_portal =OST_REQUEST_PORTAL;
3992
3993                 rc = obd_notify_observer(obd, obd, OBD_NOTIFY_OCD, NULL);
3994                 break;
3995         }
3996         default:
3997                 CERROR("Unknown import event %d\n", event);
3998                 LBUG();
3999         }
4000         RETURN(rc);
4001 }
4002
4003 int osc_setup(struct obd_device *obd, struct lustre_cfg *lcfg)
4004 {
4005         int rc;
4006         ENTRY;
4007
4008         ENTRY;
4009         rc = ptlrpcd_addref();
4010         if (rc)
4011                 RETURN(rc);
4012
4013         rc = client_obd_setup(obd, lcfg);
4014         if (rc) {
4015                 ptlrpcd_decref();
4016         } else {
4017                 struct lprocfs_static_vars lvars = { 0 };
4018                 struct client_obd *cli = &obd->u.cli;
4019
4020                 lprocfs_osc_init_vars(&lvars);
4021                 if (lprocfs_obd_setup(obd, lvars.obd_vars) == 0) {
4022                         lproc_osc_attach_seqstat(obd);
4023                         sptlrpc_lprocfs_cliobd_attach(obd);
4024                         ptlrpc_lprocfs_register_obd(obd);
4025                 }
4026
4027                 oscc_init(obd);
4028                 /* We need to allocate a few requests more, because
4029                    brw_interpret tries to create new requests before freeing
4030                    previous ones. Ideally we want to have 2x max_rpcs_in_flight
4031                    reserved, but I afraid that might be too much wasted RAM
4032                    in fact, so 2 is just my guess and still should work. */
4033                 cli->cl_import->imp_rq_pool =
4034                         ptlrpc_init_rq_pool(cli->cl_max_rpcs_in_flight + 2,
4035                                             OST_MAXREQSIZE,
4036                                             ptlrpc_add_rqs_to_pool);
4037                 cli->cl_cache = cache_create(obd);
4038                 if (!cli->cl_cache) {
4039                         osc_cleanup(obd);
4040                         rc = -ENOMEM;
4041                 }
4042         }
4043
4044         RETURN(rc);
4045 }
4046
4047 static int osc_precleanup(struct obd_device *obd, enum obd_cleanup_stage stage)
4048 {
4049         int rc = 0;
4050         ENTRY;
4051
4052         switch (stage) {
4053         case OBD_CLEANUP_EARLY: {
4054                 struct obd_import *imp;
4055                 imp = obd->u.cli.cl_import;
4056                 CDEBUG(D_HA, "Deactivating import %s\n", obd->obd_name);
4057                 /* ptlrpc_abort_inflight to stop an mds_lov_synchronize */
4058                 ptlrpc_deactivate_import(imp);
4059                 spin_lock(&imp->imp_lock);
4060                 imp->imp_pingable = 0;
4061                 spin_unlock(&imp->imp_lock);
4062                 break;
4063         }
4064         case OBD_CLEANUP_EXPORTS: {
4065                 /* If we set up but never connected, the
4066                    client import will not have been cleaned. */
4067                 if (obd->u.cli.cl_import) {
4068                         struct obd_import *imp;
4069                         imp = obd->u.cli.cl_import;
4070                         CDEBUG(D_CONFIG, "%s: client import never connected\n",
4071                                obd->obd_name);
4072                         ptlrpc_invalidate_import(imp);
4073                         ptlrpc_free_rq_pool(imp->imp_rq_pool);
4074                         class_destroy_import(imp);
4075                         obd->u.cli.cl_import = NULL;
4076                 }
4077                 rc = obd_llog_finish(obd, 0);
4078                 if (rc != 0)
4079                         CERROR("failed to cleanup llogging subsystems\n");
4080                 break;
4081                 }
4082         }
4083         RETURN(rc);
4084 }
4085
4086 int osc_cleanup(struct obd_device *obd)
4087 {
4088         struct osc_creator *oscc = &obd->u.cli.cl_oscc;
4089         int rc;
4090
4091         ENTRY;
4092         ptlrpc_lprocfs_unregister_obd(obd);
4093         lprocfs_obd_cleanup(obd);
4094
4095         spin_lock(&oscc->oscc_lock);
4096         oscc->oscc_flags &= ~OSCC_FLAG_RECOVERING;
4097         oscc->oscc_flags |= OSCC_FLAG_EXITING;
4098         spin_unlock(&oscc->oscc_lock);
4099
4100         /* free memory of osc quota cache */
4101         lquota_cleanup(quota_interface, obd);
4102
4103         cache_destroy(obd->u.cli.cl_cache);
4104         rc = client_obd_cleanup(obd);
4105
4106         ptlrpcd_decref();
4107         RETURN(rc);
4108 }
4109
4110 static int osc_register_page_removal_cb(struct obd_export *exp,
4111                                         obd_page_removal_cb_t func,
4112                                         obd_pin_extent_cb pin_cb)
4113 {
4114         return cache_add_extent_removal_cb(exp->exp_obd->u.cli.cl_cache, func,
4115                                            pin_cb);
4116 }
4117
4118 static int osc_unregister_page_removal_cb(struct obd_export *exp,
4119                                           obd_page_removal_cb_t func)
4120 {
4121         return cache_del_extent_removal_cb(exp->exp_obd->u.cli.cl_cache, func);
4122 }
4123
4124 static int osc_register_lock_cancel_cb(struct obd_export *exp,
4125                                        obd_lock_cancel_cb cb)
4126 {
4127         LASSERT(exp->exp_obd->u.cli.cl_ext_lock_cancel_cb == NULL);
4128
4129         exp->exp_obd->u.cli.cl_ext_lock_cancel_cb = cb;
4130         return 0;
4131 }
4132
4133 static int osc_unregister_lock_cancel_cb(struct obd_export *exp,
4134                                          obd_lock_cancel_cb cb)
4135 {
4136         if (exp->exp_obd->u.cli.cl_ext_lock_cancel_cb != cb) {
4137                 CERROR("Unregistering cancel cb %p, while only %p was "
4138                        "registered\n", cb,
4139                        exp->exp_obd->u.cli.cl_ext_lock_cancel_cb);
4140                 RETURN(-EINVAL);
4141         }
4142
4143         exp->exp_obd->u.cli.cl_ext_lock_cancel_cb = NULL;
4144         return 0;
4145 }
4146
4147 static int osc_process_config(struct obd_device *obd, obd_count len, void *buf)
4148 {
4149         struct lustre_cfg *lcfg = buf;
4150         struct lprocfs_static_vars lvars = { 0 };
4151         int rc = 0;
4152
4153         lprocfs_osc_init_vars(&lvars);
4154
4155         switch (lcfg->lcfg_command) {
4156         case LCFG_SPTLRPC_CONF:
4157                 rc = sptlrpc_cliobd_process_config(obd, lcfg);
4158                 break;
4159         default:
4160                 rc = class_process_proc_param(PARAM_OSC, lvars.obd_vars,
4161                                               lcfg, obd);
4162                 break;
4163         }
4164
4165         return(rc);
4166 }
4167
4168 struct obd_ops osc_obd_ops = {
4169         .o_owner                = THIS_MODULE,
4170         .o_setup                = osc_setup,
4171         .o_precleanup           = osc_precleanup,
4172         .o_cleanup              = osc_cleanup,
4173         .o_add_conn             = client_import_add_conn,
4174         .o_del_conn             = client_import_del_conn,
4175         .o_connect              = client_connect_import,
4176         .o_reconnect            = osc_reconnect,
4177         .o_disconnect           = osc_disconnect,
4178         .o_statfs               = osc_statfs,
4179         .o_statfs_async         = osc_statfs_async,
4180         .o_packmd               = osc_packmd,
4181         .o_unpackmd             = osc_unpackmd,
4182         .o_precreate            = osc_precreate,
4183         .o_create               = osc_create,
4184         .o_destroy              = osc_destroy,
4185         .o_getattr              = osc_getattr,
4186         .o_getattr_async        = osc_getattr_async,
4187         .o_setattr              = osc_setattr,
4188         .o_setattr_async        = osc_setattr_async,
4189         .o_brw                  = osc_brw,
4190         .o_brw_async            = osc_brw_async,
4191         .o_prep_async_page      = osc_prep_async_page,
4192         .o_reget_short_lock     = osc_reget_short_lock,
4193         .o_release_short_lock   = osc_release_short_lock,
4194         .o_queue_async_io       = osc_queue_async_io,
4195         .o_set_async_flags      = osc_set_async_flags,
4196         .o_queue_group_io       = osc_queue_group_io,
4197         .o_trigger_group_io     = osc_trigger_group_io,
4198         .o_teardown_async_page  = osc_teardown_async_page,
4199         .o_punch                = osc_punch,
4200         .o_sync                 = osc_sync,
4201         .o_enqueue              = osc_enqueue,
4202         .o_match                = osc_match,
4203         .o_change_cbdata        = osc_change_cbdata,
4204         .o_cancel               = osc_cancel,
4205         .o_cancel_unused        = osc_cancel_unused,
4206         .o_join_lru             = osc_join_lru,
4207         .o_iocontrol            = osc_iocontrol,
4208         .o_get_info             = osc_get_info,
4209         .o_set_info_async       = osc_set_info_async,
4210         .o_import_event         = osc_import_event,
4211         .o_llog_init            = osc_llog_init,
4212         .o_llog_finish          = osc_llog_finish,
4213         .o_process_config       = osc_process_config,
4214         .o_register_page_removal_cb = osc_register_page_removal_cb,
4215         .o_unregister_page_removal_cb = osc_unregister_page_removal_cb,
4216         .o_register_lock_cancel_cb = osc_register_lock_cancel_cb,
4217         .o_unregister_lock_cancel_cb = osc_unregister_lock_cancel_cb,
4218 };
4219 int __init osc_init(void)
4220 {
4221         struct lprocfs_static_vars lvars = { 0 };
4222         int rc;
4223         ENTRY;
4224
4225         lprocfs_osc_init_vars(&lvars);
4226
4227         request_module("lquota");
4228         quota_interface = PORTAL_SYMBOL_GET(osc_quota_interface);
4229         lquota_init(quota_interface);
4230         init_obd_quota_ops(quota_interface, &osc_obd_ops);
4231
4232         rc = class_register_type(&osc_obd_ops, NULL, lvars.module_vars,
4233                                  LUSTRE_OSC_NAME, NULL);
4234         if (rc) {
4235                 if (quota_interface)
4236                         PORTAL_SYMBOL_PUT(osc_quota_interface);
4237                 RETURN(rc);
4238         }
4239
4240         RETURN(rc);
4241 }
4242
4243 #ifdef __KERNEL__
4244 static void /*__exit*/ osc_exit(void)
4245 {
4246         lquota_exit(quota_interface);
4247         if (quota_interface)
4248                 PORTAL_SYMBOL_PUT(osc_quota_interface);
4249
4250         class_unregister_type(LUSTRE_OSC_NAME);
4251 }
4252
4253 MODULE_AUTHOR("Sun Microsystems, Inc. <http://www.lustre.org/>");
4254 MODULE_DESCRIPTION("Lustre Object Storage Client (OSC)");
4255 MODULE_LICENSE("GPL");
4256
4257 cfs_module(osc, LUSTRE_VERSION_STRING, osc_init, osc_exit);
4258 #endif