Whamcloud - gitweb
91939d0166b5be77cad3cec6a70beb76bbe4a8a7
[fs/lustre-release.git] / lustre / osc / osc_request.c
1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2  * vim:expandtab:shiftwidth=8:tabstop=8:
3  *
4  * GPL HEADER START
5  *
6  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
7  *
8  * This program is free software; you can redistribute it and/or modify
9  * it under the terms of the GNU General Public License version 2 only,
10  * as published by the Free Software Foundation.
11  *
12  * This program is distributed in the hope that it will be useful, but
13  * WITHOUT ANY WARRANTY; without even the implied warranty of
14  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
15  * General Public License version 2 for more details (a copy is included
16  * in the LICENSE file that accompanied this code).
17  *
18  * You should have received a copy of the GNU General Public License
19  * version 2 along with this program; If not, see
20  * http://www.sun.com/software/products/lustre/docs/GPLv2.pdf
21  *
22  * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara,
23  * CA 95054 USA or visit www.sun.com if you need additional information or
24  * have any questions.
25  *
26  * GPL HEADER END
27  */
28 /*
29  * Copyright (c) 2002, 2010, Oracle and/or its affiliates. All rights reserved.
30  * Use is subject to license terms.
31  */
32 /*
33  * This file is part of Lustre, http://www.lustre.org/
34  * Lustre is a trademark of Sun Microsystems, Inc.
35  */
36
37 #ifndef EXPORT_SYMTAB
38 # define EXPORT_SYMTAB
39 #endif
40 #define DEBUG_SUBSYSTEM S_OSC
41
42 #include <libcfs/libcfs.h>
43
44 #ifndef __KERNEL__
45 # include <liblustre.h>
46 #endif
47
48 #include <lustre_dlm.h>
49 #include <lustre_net.h>
50 #include <lustre/lustre_user.h>
51 #include <obd_cksum.h>
52 #include <obd_ost.h>
53 #include <obd_lov.h>
54
55 #ifdef  __CYGWIN__
56 # include <ctype.h>
57 #endif
58
59 #include <lustre_ha.h>
60 #include <lprocfs_status.h>
61 #include <lustre_log.h>
62 #include <lustre_debug.h>
63 #include <lustre_param.h>
64 #include "osc_internal.h"
65
66 static quota_interface_t *quota_interface = NULL;
67 extern quota_interface_t osc_quota_interface;
68
69 static void osc_release_ppga(struct brw_page **ppga, obd_count count);
70 static int brw_interpret(const struct lu_env *env,
71                          struct ptlrpc_request *req, void *data, int rc);
72 int osc_cleanup(struct obd_device *obd);
73
74 /* Pack OSC object metadata for disk storage (LE byte order). */
75 static int osc_packmd(struct obd_export *exp, struct lov_mds_md **lmmp,
76                       struct lov_stripe_md *lsm)
77 {
78         int lmm_size;
79         ENTRY;
80
81         lmm_size = sizeof(**lmmp);
82         if (!lmmp)
83                 RETURN(lmm_size);
84
85         if (*lmmp && !lsm) {
86                 OBD_FREE(*lmmp, lmm_size);
87                 *lmmp = NULL;
88                 RETURN(0);
89         }
90
91         if (!*lmmp) {
92                 OBD_ALLOC(*lmmp, lmm_size);
93                 if (!*lmmp)
94                         RETURN(-ENOMEM);
95         }
96
97         if (lsm) {
98                 LASSERT(lsm->lsm_object_id);
99                 LASSERT_SEQ_IS_MDT(lsm->lsm_object_seq);
100                 (*lmmp)->lmm_object_id = cpu_to_le64(lsm->lsm_object_id);
101                 (*lmmp)->lmm_object_seq = cpu_to_le64(lsm->lsm_object_seq);
102         }
103
104         RETURN(lmm_size);
105 }
106
107 /* Unpack OSC object metadata from disk storage (LE byte order). */
108 static int osc_unpackmd(struct obd_export *exp, struct lov_stripe_md **lsmp,
109                         struct lov_mds_md *lmm, int lmm_bytes)
110 {
111         int lsm_size;
112         ENTRY;
113
114         if (lmm != NULL) {
115                 if (lmm_bytes < sizeof (*lmm)) {
116                         CERROR("lov_mds_md too small: %d, need %d\n",
117                                lmm_bytes, (int)sizeof(*lmm));
118                         RETURN(-EINVAL);
119                 }
120                 /* XXX LOV_MAGIC etc check? */
121
122                 if (lmm->lmm_object_id == 0) {
123                         CERROR("lov_mds_md: zero lmm_object_id\n");
124                         RETURN(-EINVAL);
125                 }
126         }
127
128         lsm_size = lov_stripe_md_size(1);
129         if (lsmp == NULL)
130                 RETURN(lsm_size);
131
132         if (*lsmp != NULL && lmm == NULL) {
133                 OBD_FREE((*lsmp)->lsm_oinfo[0], sizeof(struct lov_oinfo));
134                 OBD_FREE(*lsmp, lsm_size);
135                 *lsmp = NULL;
136                 RETURN(0);
137         }
138
139         if (*lsmp == NULL) {
140                 OBD_ALLOC(*lsmp, lsm_size);
141                 if (*lsmp == NULL)
142                         RETURN(-ENOMEM);
143                 OBD_ALLOC((*lsmp)->lsm_oinfo[0], sizeof(struct lov_oinfo));
144                 if ((*lsmp)->lsm_oinfo[0] == NULL) {
145                         OBD_FREE(*lsmp, lsm_size);
146                         RETURN(-ENOMEM);
147                 }
148                 loi_init((*lsmp)->lsm_oinfo[0]);
149         }
150
151         if (lmm != NULL) {
152                 /* XXX zero *lsmp? */
153                 (*lsmp)->lsm_object_id = le64_to_cpu (lmm->lmm_object_id);
154                 (*lsmp)->lsm_object_seq = le64_to_cpu (lmm->lmm_object_seq);
155                 LASSERT((*lsmp)->lsm_object_id);
156                 LASSERT_SEQ_IS_MDT((*lsmp)->lsm_object_seq);
157         }
158
159         (*lsmp)->lsm_maxbytes = LUSTRE_STRIPE_MAXBYTES;
160
161         RETURN(lsm_size);
162 }
163
164 static inline void osc_pack_capa(struct ptlrpc_request *req,
165                                  struct ost_body *body, void *capa)
166 {
167         struct obd_capa *oc = (struct obd_capa *)capa;
168         struct lustre_capa *c;
169
170         if (!capa)
171                 return;
172
173         c = req_capsule_client_get(&req->rq_pill, &RMF_CAPA1);
174         LASSERT(c);
175         capa_cpy(c, oc);
176         body->oa.o_valid |= OBD_MD_FLOSSCAPA;
177         DEBUG_CAPA(D_SEC, c, "pack");
178 }
179
180 static inline void osc_pack_req_body(struct ptlrpc_request *req,
181                                      struct obd_info *oinfo)
182 {
183         struct ost_body *body;
184
185         body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
186         LASSERT(body);
187
188         lustre_set_wire_obdo(&body->oa, oinfo->oi_oa);
189         osc_pack_capa(req, body, oinfo->oi_capa);
190 }
191
192 static inline void osc_set_capa_size(struct ptlrpc_request *req,
193                                      const struct req_msg_field *field,
194                                      struct obd_capa *oc)
195 {
196         if (oc == NULL)
197                 req_capsule_set_size(&req->rq_pill, field, RCL_CLIENT, 0);
198         else
199                 /* it is already calculated as sizeof struct obd_capa */
200                 ;
201 }
202
203 static int osc_getattr_interpret(const struct lu_env *env,
204                                  struct ptlrpc_request *req,
205                                  struct osc_async_args *aa, int rc)
206 {
207         struct ost_body *body;
208         ENTRY;
209
210         if (rc != 0)
211                 GOTO(out, rc);
212
213         body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
214         if (body) {
215                 CDEBUG(D_INODE, "mode: %o\n", body->oa.o_mode);
216                 lustre_get_wire_obdo(aa->aa_oi->oi_oa, &body->oa);
217
218                 /* This should really be sent by the OST */
219                 aa->aa_oi->oi_oa->o_blksize = PTLRPC_MAX_BRW_SIZE;
220                 aa->aa_oi->oi_oa->o_valid |= OBD_MD_FLBLKSZ;
221         } else {
222                 CDEBUG(D_INFO, "can't unpack ost_body\n");
223                 rc = -EPROTO;
224                 aa->aa_oi->oi_oa->o_valid = 0;
225         }
226 out:
227         rc = aa->aa_oi->oi_cb_up(aa->aa_oi, rc);
228         RETURN(rc);
229 }
230
231 static int osc_getattr_async(struct obd_export *exp, struct obd_info *oinfo,
232                              struct ptlrpc_request_set *set)
233 {
234         struct ptlrpc_request *req;
235         struct osc_async_args *aa;
236         int                    rc;
237         ENTRY;
238
239         req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_GETATTR);
240         if (req == NULL)
241                 RETURN(-ENOMEM);
242
243         osc_set_capa_size(req, &RMF_CAPA1, oinfo->oi_capa);
244         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_GETATTR);
245         if (rc) {
246                 ptlrpc_request_free(req);
247                 RETURN(rc);
248         }
249
250         osc_pack_req_body(req, oinfo);
251
252         ptlrpc_request_set_replen(req);
253         req->rq_interpret_reply = (ptlrpc_interpterer_t)osc_getattr_interpret;
254
255         CLASSERT(sizeof(*aa) <= sizeof(req->rq_async_args));
256         aa = ptlrpc_req_async_args(req);
257         aa->aa_oi = oinfo;
258
259         ptlrpc_set_add_req(set, req);
260         RETURN(0);
261 }
262
263 static int osc_getattr(struct obd_export *exp, struct obd_info *oinfo)
264 {
265         struct ptlrpc_request *req;
266         struct ost_body       *body;
267         int                    rc;
268         ENTRY;
269
270         req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_GETATTR);
271         if (req == NULL)
272                 RETURN(-ENOMEM);
273
274         osc_set_capa_size(req, &RMF_CAPA1, oinfo->oi_capa);
275         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_GETATTR);
276         if (rc) {
277                 ptlrpc_request_free(req);
278                 RETURN(rc);
279         }
280
281         osc_pack_req_body(req, oinfo);
282
283         ptlrpc_request_set_replen(req);
284
285         rc = ptlrpc_queue_wait(req);
286         if (rc)
287                 GOTO(out, rc);
288
289         body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
290         if (body == NULL)
291                 GOTO(out, rc = -EPROTO);
292
293         CDEBUG(D_INODE, "mode: %o\n", body->oa.o_mode);
294         lustre_get_wire_obdo(oinfo->oi_oa, &body->oa);
295
296         /* This should really be sent by the OST */
297         oinfo->oi_oa->o_blksize = PTLRPC_MAX_BRW_SIZE;
298         oinfo->oi_oa->o_valid |= OBD_MD_FLBLKSZ;
299
300         EXIT;
301  out:
302         ptlrpc_req_finished(req);
303         return rc;
304 }
305
306 static int osc_setattr(struct obd_export *exp, struct obd_info *oinfo,
307                        struct obd_trans_info *oti)
308 {
309         struct ptlrpc_request *req;
310         struct ost_body       *body;
311         int                    rc;
312         ENTRY;
313
314         LASSERT(oinfo->oi_oa->o_valid & OBD_MD_FLGROUP);
315
316         req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_SETATTR);
317         if (req == NULL)
318                 RETURN(-ENOMEM);
319
320         osc_set_capa_size(req, &RMF_CAPA1, oinfo->oi_capa);
321         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_SETATTR);
322         if (rc) {
323                 ptlrpc_request_free(req);
324                 RETURN(rc);
325         }
326
327         osc_pack_req_body(req, oinfo);
328
329         ptlrpc_request_set_replen(req);
330
331         rc = ptlrpc_queue_wait(req);
332         if (rc)
333                 GOTO(out, rc);
334
335         body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
336         if (body == NULL)
337                 GOTO(out, rc = -EPROTO);
338
339         lustre_get_wire_obdo(oinfo->oi_oa, &body->oa);
340
341         EXIT;
342 out:
343         ptlrpc_req_finished(req);
344         RETURN(rc);
345 }
346
347 static int osc_setattr_interpret(const struct lu_env *env,
348                                  struct ptlrpc_request *req,
349                                  struct osc_setattr_args *sa, int rc)
350 {
351         struct ost_body *body;
352         ENTRY;
353
354         if (rc != 0)
355                 GOTO(out, rc);
356
357         body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
358         if (body == NULL)
359                 GOTO(out, rc = -EPROTO);
360
361         lustre_get_wire_obdo(sa->sa_oa, &body->oa);
362 out:
363         rc = sa->sa_upcall(sa->sa_cookie, rc);
364         RETURN(rc);
365 }
366
367 int osc_setattr_async_base(struct obd_export *exp, struct obd_info *oinfo,
368                            struct obd_trans_info *oti,
369                            obd_enqueue_update_f upcall, void *cookie,
370                            struct ptlrpc_request_set *rqset)
371 {
372         struct ptlrpc_request   *req;
373         struct osc_setattr_args *sa;
374         int                      rc;
375         ENTRY;
376
377         req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_SETATTR);
378         if (req == NULL)
379                 RETURN(-ENOMEM);
380
381         osc_set_capa_size(req, &RMF_CAPA1, oinfo->oi_capa);
382         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_SETATTR);
383         if (rc) {
384                 ptlrpc_request_free(req);
385                 RETURN(rc);
386         }
387
388         if (oti && oinfo->oi_oa->o_valid & OBD_MD_FLCOOKIE)
389                 oinfo->oi_oa->o_lcookie = *oti->oti_logcookies;
390
391         osc_pack_req_body(req, oinfo);
392
393         ptlrpc_request_set_replen(req);
394
395         /* do mds to ost setattr asynchronously */
396         if (!rqset) {
397                 /* Do not wait for response. */
398                 ptlrpcd_add_req(req, PSCOPE_OTHER);
399         } else {
400                 req->rq_interpret_reply =
401                         (ptlrpc_interpterer_t)osc_setattr_interpret;
402
403                 CLASSERT (sizeof(*sa) <= sizeof(req->rq_async_args));
404                 sa = ptlrpc_req_async_args(req);
405                 sa->sa_oa = oinfo->oi_oa;
406                 sa->sa_upcall = upcall;
407                 sa->sa_cookie = cookie;
408
409                 if (rqset == PTLRPCD_SET)
410                         ptlrpcd_add_req(req, PSCOPE_OTHER);
411                 else
412                         ptlrpc_set_add_req(rqset, req);
413         }
414
415         RETURN(0);
416 }
417
418 static int osc_setattr_async(struct obd_export *exp, struct obd_info *oinfo,
419                              struct obd_trans_info *oti,
420                              struct ptlrpc_request_set *rqset)
421 {
422         return osc_setattr_async_base(exp, oinfo, oti,
423                                       oinfo->oi_cb_up, oinfo, rqset);
424 }
425
426 int osc_real_create(struct obd_export *exp, struct obdo *oa,
427                     struct lov_stripe_md **ea, struct obd_trans_info *oti)
428 {
429         struct ptlrpc_request *req;
430         struct ost_body       *body;
431         struct lov_stripe_md  *lsm;
432         int                    rc;
433         ENTRY;
434
435         LASSERT(oa);
436         LASSERT(ea);
437
438         lsm = *ea;
439         if (!lsm) {
440                 rc = obd_alloc_memmd(exp, &lsm);
441                 if (rc < 0)
442                         RETURN(rc);
443         }
444
445         req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_CREATE);
446         if (req == NULL)
447                 GOTO(out, rc = -ENOMEM);
448
449         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_CREATE);
450         if (rc) {
451                 ptlrpc_request_free(req);
452                 GOTO(out, rc);
453         }
454
455         body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
456         LASSERT(body);
457         lustre_set_wire_obdo(&body->oa, oa);
458
459         ptlrpc_request_set_replen(req);
460
461         if ((oa->o_valid & OBD_MD_FLFLAGS) &&
462             oa->o_flags == OBD_FL_DELORPHAN) {
463                 DEBUG_REQ(D_HA, req,
464                           "delorphan from OST integration");
465                 /* Don't resend the delorphan req */
466                 req->rq_no_resend = req->rq_no_delay = 1;
467         }
468
469         rc = ptlrpc_queue_wait(req);
470         if (rc)
471                 GOTO(out_req, rc);
472
473         body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
474         if (body == NULL)
475                 GOTO(out_req, rc = -EPROTO);
476
477         lustre_get_wire_obdo(oa, &body->oa);
478
479         /* This should really be sent by the OST */
480         oa->o_blksize = PTLRPC_MAX_BRW_SIZE;
481         oa->o_valid |= OBD_MD_FLBLKSZ;
482
483         /* XXX LOV STACKING: the lsm that is passed to us from LOV does not
484          * have valid lsm_oinfo data structs, so don't go touching that.
485          * This needs to be fixed in a big way.
486          */
487         lsm->lsm_object_id = oa->o_id;
488         lsm->lsm_object_seq = oa->o_seq;
489         *ea = lsm;
490
491         if (oti != NULL) {
492                 oti->oti_transno = lustre_msg_get_transno(req->rq_repmsg);
493
494                 if (oa->o_valid & OBD_MD_FLCOOKIE) {
495                         if (!oti->oti_logcookies)
496                                 oti_alloc_cookies(oti, 1);
497                         *oti->oti_logcookies = oa->o_lcookie;
498                 }
499         }
500
501         CDEBUG(D_HA, "transno: "LPD64"\n",
502                lustre_msg_get_transno(req->rq_repmsg));
503 out_req:
504         ptlrpc_req_finished(req);
505 out:
506         if (rc && !*ea)
507                 obd_free_memmd(exp, &lsm);
508         RETURN(rc);
509 }
510
511 int osc_punch_base(struct obd_export *exp, struct obd_info *oinfo,
512                    obd_enqueue_update_f upcall, void *cookie,
513                    struct ptlrpc_request_set *rqset)
514 {
515         struct ptlrpc_request   *req;
516         struct osc_setattr_args *sa;
517         struct ost_body         *body;
518         int                      rc;
519         ENTRY;
520
521         req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_PUNCH);
522         if (req == NULL)
523                 RETURN(-ENOMEM);
524
525         osc_set_capa_size(req, &RMF_CAPA1, oinfo->oi_capa);
526         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_PUNCH);
527         if (rc) {
528                 ptlrpc_request_free(req);
529                 RETURN(rc);
530         }
531         req->rq_request_portal = OST_IO_PORTAL; /* bug 7198 */
532         ptlrpc_at_set_req_timeout(req);
533
534         body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
535         LASSERT(body);
536         lustre_set_wire_obdo(&body->oa, oinfo->oi_oa);
537         osc_pack_capa(req, body, oinfo->oi_capa);
538
539         ptlrpc_request_set_replen(req);
540
541
542         req->rq_interpret_reply = (ptlrpc_interpterer_t)osc_setattr_interpret;
543         CLASSERT (sizeof(*sa) <= sizeof(req->rq_async_args));
544         sa = ptlrpc_req_async_args(req);
545         sa->sa_oa     = oinfo->oi_oa;
546         sa->sa_upcall = upcall;
547         sa->sa_cookie = cookie;
548         if (rqset == PTLRPCD_SET)
549                 ptlrpcd_add_req(req, PSCOPE_OTHER);
550         else
551                 ptlrpc_set_add_req(rqset, req);
552
553         RETURN(0);
554 }
555
556 static int osc_punch(struct obd_export *exp, struct obd_info *oinfo,
557                      struct obd_trans_info *oti,
558                      struct ptlrpc_request_set *rqset)
559 {
560         oinfo->oi_oa->o_size   = oinfo->oi_policy.l_extent.start;
561         oinfo->oi_oa->o_blocks = oinfo->oi_policy.l_extent.end;
562         oinfo->oi_oa->o_valid |= OBD_MD_FLSIZE | OBD_MD_FLBLOCKS;
563         return osc_punch_base(exp, oinfo,
564                               oinfo->oi_cb_up, oinfo, rqset);
565 }
566
567 static int osc_sync_interpret(const struct lu_env *env,
568                               struct ptlrpc_request *req,
569                               void *arg, int rc)
570 {
571         struct osc_async_args *aa = arg;
572         struct ost_body *body;
573         ENTRY;
574
575         if (rc)
576                 GOTO(out, rc);
577
578         body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
579         if (body == NULL) {
580                 CERROR ("can't unpack ost_body\n");
581                 GOTO(out, rc = -EPROTO);
582         }
583
584         *aa->aa_oi->oi_oa = body->oa;
585 out:
586         rc = aa->aa_oi->oi_cb_up(aa->aa_oi, rc);
587         RETURN(rc);
588 }
589
590 static int osc_sync(struct obd_export *exp, struct obd_info *oinfo,
591                     obd_size start, obd_size end,
592                     struct ptlrpc_request_set *set)
593 {
594         struct ptlrpc_request *req;
595         struct ost_body       *body;
596         struct osc_async_args *aa;
597         int                    rc;
598         ENTRY;
599
600         if (!oinfo->oi_oa) {
601                 CDEBUG(D_INFO, "oa NULL\n");
602                 RETURN(-EINVAL);
603         }
604
605         req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_SYNC);
606         if (req == NULL)
607                 RETURN(-ENOMEM);
608
609         osc_set_capa_size(req, &RMF_CAPA1, oinfo->oi_capa);
610         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_SYNC);
611         if (rc) {
612                 ptlrpc_request_free(req);
613                 RETURN(rc);
614         }
615
616         /* overload the size and blocks fields in the oa with start/end */
617         body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
618         LASSERT(body);
619         lustre_set_wire_obdo(&body->oa, oinfo->oi_oa);
620         body->oa.o_size = start;
621         body->oa.o_blocks = end;
622         body->oa.o_valid |= (OBD_MD_FLSIZE | OBD_MD_FLBLOCKS);
623         osc_pack_capa(req, body, oinfo->oi_capa);
624
625         ptlrpc_request_set_replen(req);
626         req->rq_interpret_reply = osc_sync_interpret;
627
628         CLASSERT(sizeof(*aa) <= sizeof(req->rq_async_args));
629         aa = ptlrpc_req_async_args(req);
630         aa->aa_oi = oinfo;
631
632         ptlrpc_set_add_req(set, req);
633         RETURN (0);
634 }
635
636 /* Find and cancel locally locks matched by @mode in the resource found by
637  * @objid. Found locks are added into @cancel list. Returns the amount of
638  * locks added to @cancels list. */
639 static int osc_resource_get_unused(struct obd_export *exp, struct obdo *oa,
640                                    cfs_list_t *cancels,
641                                    ldlm_mode_t mode, int lock_flags)
642 {
643         struct ldlm_namespace *ns = exp->exp_obd->obd_namespace;
644         struct ldlm_res_id res_id;
645         struct ldlm_resource *res;
646         int count;
647         ENTRY;
648
649         osc_build_res_name(oa->o_id, oa->o_seq, &res_id);
650         res = ldlm_resource_get(ns, NULL, &res_id, 0, 0);
651         if (res == NULL)
652                 RETURN(0);
653
654         LDLM_RESOURCE_ADDREF(res);
655         count = ldlm_cancel_resource_local(res, cancels, NULL, mode,
656                                            lock_flags, 0, NULL);
657         LDLM_RESOURCE_DELREF(res);
658         ldlm_resource_putref(res);
659         RETURN(count);
660 }
661
662 static int osc_destroy_interpret(const struct lu_env *env,
663                                  struct ptlrpc_request *req, void *data,
664                                  int rc)
665 {
666         struct client_obd *cli = &req->rq_import->imp_obd->u.cli;
667
668         cfs_atomic_dec(&cli->cl_destroy_in_flight);
669         cfs_waitq_signal(&cli->cl_destroy_waitq);
670         return 0;
671 }
672
673 static int osc_can_send_destroy(struct client_obd *cli)
674 {
675         if (cfs_atomic_inc_return(&cli->cl_destroy_in_flight) <=
676             cli->cl_max_rpcs_in_flight) {
677                 /* The destroy request can be sent */
678                 return 1;
679         }
680         if (cfs_atomic_dec_return(&cli->cl_destroy_in_flight) <
681             cli->cl_max_rpcs_in_flight) {
682                 /*
683                  * The counter has been modified between the two atomic
684                  * operations.
685                  */
686                 cfs_waitq_signal(&cli->cl_destroy_waitq);
687         }
688         return 0;
689 }
690
691 /* Destroy requests can be async always on the client, and we don't even really
692  * care about the return code since the client cannot do anything at all about
693  * a destroy failure.
694  * When the MDS is unlinking a filename, it saves the file objects into a
695  * recovery llog, and these object records are cancelled when the OST reports
696  * they were destroyed and sync'd to disk (i.e. transaction committed).
697  * If the client dies, or the OST is down when the object should be destroyed,
698  * the records are not cancelled, and when the OST reconnects to the MDS next,
699  * it will retrieve the llog unlink logs and then sends the log cancellation
700  * cookies to the MDS after committing destroy transactions. */
701 static int osc_destroy(struct obd_export *exp, struct obdo *oa,
702                        struct lov_stripe_md *ea, struct obd_trans_info *oti,
703                        struct obd_export *md_export, void *capa)
704 {
705         struct client_obd     *cli = &exp->exp_obd->u.cli;
706         struct ptlrpc_request *req;
707         struct ost_body       *body;
708         CFS_LIST_HEAD(cancels);
709         int rc, count;
710         ENTRY;
711
712         if (!oa) {
713                 CDEBUG(D_INFO, "oa NULL\n");
714                 RETURN(-EINVAL);
715         }
716
717         count = osc_resource_get_unused(exp, oa, &cancels, LCK_PW,
718                                         LDLM_FL_DISCARD_DATA);
719
720         req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_DESTROY);
721         if (req == NULL) {
722                 ldlm_lock_list_put(&cancels, l_bl_ast, count);
723                 RETURN(-ENOMEM);
724         }
725
726         osc_set_capa_size(req, &RMF_CAPA1, (struct obd_capa *)capa);
727         rc = ldlm_prep_elc_req(exp, req, LUSTRE_OST_VERSION, OST_DESTROY,
728                                0, &cancels, count);
729         if (rc) {
730                 ptlrpc_request_free(req);
731                 RETURN(rc);
732         }
733
734         req->rq_request_portal = OST_IO_PORTAL; /* bug 7198 */
735         ptlrpc_at_set_req_timeout(req);
736
737         if (oti != NULL && oa->o_valid & OBD_MD_FLCOOKIE)
738                 oa->o_lcookie = *oti->oti_logcookies;
739         body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
740         LASSERT(body);
741         lustre_set_wire_obdo(&body->oa, oa);
742
743         osc_pack_capa(req, body, (struct obd_capa *)capa);
744         ptlrpc_request_set_replen(req);
745
746         /* don't throttle destroy RPCs for the MDT */
747         if (!(cli->cl_import->imp_connect_flags_orig & OBD_CONNECT_MDS)) {
748                 req->rq_interpret_reply = osc_destroy_interpret;
749                 if (!osc_can_send_destroy(cli)) {
750                         struct l_wait_info lwi = LWI_INTR(LWI_ON_SIGNAL_NOOP,
751                                                           NULL);
752
753                         /*
754                          * Wait until the number of on-going destroy RPCs drops
755                          * under max_rpc_in_flight
756                          */
757                         l_wait_event_exclusive(cli->cl_destroy_waitq,
758                                                osc_can_send_destroy(cli), &lwi);
759                 }
760         }
761
762         /* Do not wait for response */
763         ptlrpcd_add_req(req, PSCOPE_OTHER);
764         RETURN(0);
765 }
766
767 static void osc_announce_cached(struct client_obd *cli, struct obdo *oa,
768                                 long writing_bytes)
769 {
770         obd_flag bits = OBD_MD_FLBLOCKS|OBD_MD_FLGRANT;
771
772         LASSERT(!(oa->o_valid & bits));
773
774         oa->o_valid |= bits;
775         client_obd_list_lock(&cli->cl_loi_list_lock);
776         oa->o_dirty = cli->cl_dirty;
777         if (cli->cl_dirty - cli->cl_dirty_transit > cli->cl_dirty_max) {
778                 CERROR("dirty %lu - %lu > dirty_max %lu\n",
779                        cli->cl_dirty, cli->cl_dirty_transit, cli->cl_dirty_max);
780                 oa->o_undirty = 0;
781         } else if (cfs_atomic_read(&obd_dirty_pages) -
782                    cfs_atomic_read(&obd_dirty_transit_pages) >
783                    obd_max_dirty_pages + 1){
784                 /* The cfs_atomic_read() allowing the cfs_atomic_inc() are
785                  * not covered by a lock thus they may safely race and trip
786                  * this CERROR() unless we add in a small fudge factor (+1). */
787                 CERROR("dirty %d - %d > system dirty_max %d\n",
788                        cfs_atomic_read(&obd_dirty_pages),
789                        cfs_atomic_read(&obd_dirty_transit_pages),
790                        obd_max_dirty_pages);
791                 oa->o_undirty = 0;
792         } else if (cli->cl_dirty_max - cli->cl_dirty > 0x7fffffff) {
793                 CERROR("dirty %lu - dirty_max %lu too big???\n",
794                        cli->cl_dirty, cli->cl_dirty_max);
795                 oa->o_undirty = 0;
796         } else {
797                 long max_in_flight = (cli->cl_max_pages_per_rpc << CFS_PAGE_SHIFT)*
798                                 (cli->cl_max_rpcs_in_flight + 1);
799                 oa->o_undirty = max(cli->cl_dirty_max, max_in_flight);
800         }
801         oa->o_grant = cli->cl_avail_grant;
802         oa->o_dropped = cli->cl_lost_grant;
803         cli->cl_lost_grant = 0;
804         client_obd_list_unlock(&cli->cl_loi_list_lock);
805         CDEBUG(D_CACHE,"dirty: "LPU64" undirty: %u dropped %u grant: "LPU64"\n",
806                oa->o_dirty, oa->o_undirty, oa->o_dropped, oa->o_grant);
807
808 }
809
810 static void osc_update_next_shrink(struct client_obd *cli)
811 {
812         cli->cl_next_shrink_grant =
813                 cfs_time_shift(cli->cl_grant_shrink_interval);
814         CDEBUG(D_CACHE, "next time %ld to shrink grant \n",
815                cli->cl_next_shrink_grant);
816 }
817
818 /* caller must hold loi_list_lock */
819 static void osc_consume_write_grant(struct client_obd *cli,
820                                     struct brw_page *pga)
821 {
822         LASSERT_SPIN_LOCKED(&cli->cl_loi_list_lock.lock);
823         LASSERT(!(pga->flag & OBD_BRW_FROM_GRANT));
824         cfs_atomic_inc(&obd_dirty_pages);
825         cli->cl_dirty += CFS_PAGE_SIZE;
826         cli->cl_avail_grant -= CFS_PAGE_SIZE;
827         pga->flag |= OBD_BRW_FROM_GRANT;
828         CDEBUG(D_CACHE, "using %lu grant credits for brw %p page %p\n",
829                CFS_PAGE_SIZE, pga, pga->pg);
830         LASSERT(cli->cl_avail_grant >= 0);
831         osc_update_next_shrink(cli);
832 }
833
834 /* the companion to osc_consume_write_grant, called when a brw has completed.
835  * must be called with the loi lock held. */
836 static void osc_release_write_grant(struct client_obd *cli,
837                                     struct brw_page *pga, int sent)
838 {
839         int blocksize = cli->cl_import->imp_obd->obd_osfs.os_bsize ? : 4096;
840         ENTRY;
841
842         LASSERT_SPIN_LOCKED(&cli->cl_loi_list_lock.lock);
843         if (!(pga->flag & OBD_BRW_FROM_GRANT)) {
844                 EXIT;
845                 return;
846         }
847
848         pga->flag &= ~OBD_BRW_FROM_GRANT;
849         cfs_atomic_dec(&obd_dirty_pages);
850         cli->cl_dirty -= CFS_PAGE_SIZE;
851         if (pga->flag & OBD_BRW_NOCACHE) {
852                 pga->flag &= ~OBD_BRW_NOCACHE;
853                 cfs_atomic_dec(&obd_dirty_transit_pages);
854                 cli->cl_dirty_transit -= CFS_PAGE_SIZE;
855         }
856         if (!sent) {
857                 cli->cl_lost_grant += CFS_PAGE_SIZE;
858                 CDEBUG(D_CACHE, "lost grant: %lu avail grant: %lu dirty: %lu\n",
859                        cli->cl_lost_grant, cli->cl_avail_grant, cli->cl_dirty);
860         } else if (CFS_PAGE_SIZE != blocksize && pga->count != CFS_PAGE_SIZE) {
861                 /* For short writes we shouldn't count parts of pages that
862                  * span a whole block on the OST side, or our accounting goes
863                  * wrong.  Should match the code in filter_grant_check. */
864                 int offset = pga->off & ~CFS_PAGE_MASK;
865                 int count = pga->count + (offset & (blocksize - 1));
866                 int end = (offset + pga->count) & (blocksize - 1);
867                 if (end)
868                         count += blocksize - end;
869
870                 cli->cl_lost_grant += CFS_PAGE_SIZE - count;
871                 CDEBUG(D_CACHE, "lost %lu grant: %lu avail: %lu dirty: %lu\n",
872                        CFS_PAGE_SIZE - count, cli->cl_lost_grant,
873                        cli->cl_avail_grant, cli->cl_dirty);
874         }
875
876         EXIT;
877 }
878
879 static unsigned long rpcs_in_flight(struct client_obd *cli)
880 {
881         return cli->cl_r_in_flight + cli->cl_w_in_flight;
882 }
883
884 /* caller must hold loi_list_lock */
885 void osc_wake_cache_waiters(struct client_obd *cli)
886 {
887         cfs_list_t *l, *tmp;
888         struct osc_cache_waiter *ocw;
889
890         ENTRY;
891         cfs_list_for_each_safe(l, tmp, &cli->cl_cache_waiters) {
892                 /* if we can't dirty more, we must wait until some is written */
893                 if ((cli->cl_dirty + CFS_PAGE_SIZE > cli->cl_dirty_max) ||
894                    (cfs_atomic_read(&obd_dirty_pages) + 1 >
895                     obd_max_dirty_pages)) {
896                         CDEBUG(D_CACHE, "no dirty room: dirty: %ld "
897                                "osc max %ld, sys max %d\n", cli->cl_dirty,
898                                cli->cl_dirty_max, obd_max_dirty_pages);
899                         return;
900                 }
901
902                 /* if still dirty cache but no grant wait for pending RPCs that
903                  * may yet return us some grant before doing sync writes */
904                 if (cli->cl_w_in_flight && cli->cl_avail_grant < CFS_PAGE_SIZE) {
905                         CDEBUG(D_CACHE, "%u BRW writes in flight, no grant\n",
906                                cli->cl_w_in_flight);
907                         return;
908                 }
909
910                 ocw = cfs_list_entry(l, struct osc_cache_waiter, ocw_entry);
911                 cfs_list_del_init(&ocw->ocw_entry);
912                 if (cli->cl_avail_grant < CFS_PAGE_SIZE) {
913                         /* no more RPCs in flight to return grant, do sync IO */
914                         ocw->ocw_rc = -EDQUOT;
915                         CDEBUG(D_INODE, "wake oap %p for sync\n", ocw->ocw_oap);
916                 } else {
917                         osc_consume_write_grant(cli,
918                                                 &ocw->ocw_oap->oap_brw_page);
919                 }
920
921                 cfs_waitq_signal(&ocw->ocw_waitq);
922         }
923
924         EXIT;
925 }
926
927 static void __osc_update_grant(struct client_obd *cli, obd_size grant)
928 {
929         client_obd_list_lock(&cli->cl_loi_list_lock);
930         cli->cl_avail_grant += grant;
931         client_obd_list_unlock(&cli->cl_loi_list_lock);
932 }
933
934 static void osc_update_grant(struct client_obd *cli, struct ost_body *body)
935 {
936         if (body->oa.o_valid & OBD_MD_FLGRANT) {
937                 CDEBUG(D_CACHE, "got "LPU64" extra grant\n", body->oa.o_grant);
938                 __osc_update_grant(cli, body->oa.o_grant);
939         }
940 }
941
942 static int osc_set_info_async(struct obd_export *exp, obd_count keylen,
943                               void *key, obd_count vallen, void *val,
944                               struct ptlrpc_request_set *set);
945
946 static int osc_shrink_grant_interpret(const struct lu_env *env,
947                                       struct ptlrpc_request *req,
948                                       void *aa, int rc)
949 {
950         struct client_obd *cli = &req->rq_import->imp_obd->u.cli;
951         struct obdo *oa = ((struct osc_grant_args *)aa)->aa_oa;
952         struct ost_body *body;
953
954         if (rc != 0) {
955                 __osc_update_grant(cli, oa->o_grant);
956                 GOTO(out, rc);
957         }
958
959         body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
960         LASSERT(body);
961         osc_update_grant(cli, body);
962 out:
963         OBDO_FREE(oa);
964         return rc;
965 }
966
967 static void osc_shrink_grant_local(struct client_obd *cli, struct obdo *oa)
968 {
969         client_obd_list_lock(&cli->cl_loi_list_lock);
970         oa->o_grant = cli->cl_avail_grant / 4;
971         cli->cl_avail_grant -= oa->o_grant;
972         client_obd_list_unlock(&cli->cl_loi_list_lock);
973         if (!(oa->o_valid & OBD_MD_FLFLAGS)) {
974                 oa->o_valid |= OBD_MD_FLFLAGS;
975                 oa->o_flags = 0;
976         }
977         oa->o_flags |= OBD_FL_SHRINK_GRANT;
978         osc_update_next_shrink(cli);
979 }
980
981 /* Shrink the current grant, either from some large amount to enough for a
982  * full set of in-flight RPCs, or if we have already shrunk to that limit
983  * then to enough for a single RPC.  This avoids keeping more grant than
984  * needed, and avoids shrinking the grant piecemeal. */
985 static int osc_shrink_grant(struct client_obd *cli)
986 {
987         long target = (cli->cl_max_rpcs_in_flight + 1) *
988                       cli->cl_max_pages_per_rpc;
989
990         client_obd_list_lock(&cli->cl_loi_list_lock);
991         if (cli->cl_avail_grant <= target)
992                 target = cli->cl_max_pages_per_rpc;
993         client_obd_list_unlock(&cli->cl_loi_list_lock);
994
995         return osc_shrink_grant_to_target(cli, target);
996 }
997
998 int osc_shrink_grant_to_target(struct client_obd *cli, long target)
999 {
1000         int    rc = 0;
1001         struct ost_body     *body;
1002         ENTRY;
1003
1004         client_obd_list_lock(&cli->cl_loi_list_lock);
1005         /* Don't shrink if we are already above or below the desired limit
1006          * We don't want to shrink below a single RPC, as that will negatively
1007          * impact block allocation and long-term performance. */
1008         if (target < cli->cl_max_pages_per_rpc)
1009                 target = cli->cl_max_pages_per_rpc;
1010
1011         if (target >= cli->cl_avail_grant) {
1012                 client_obd_list_unlock(&cli->cl_loi_list_lock);
1013                 RETURN(0);
1014         }
1015         client_obd_list_unlock(&cli->cl_loi_list_lock);
1016
1017         OBD_ALLOC_PTR(body);
1018         if (!body)
1019                 RETURN(-ENOMEM);
1020
1021         osc_announce_cached(cli, &body->oa, 0);
1022
1023         client_obd_list_lock(&cli->cl_loi_list_lock);
1024         body->oa.o_grant = cli->cl_avail_grant - target;
1025         cli->cl_avail_grant = target;
1026         client_obd_list_unlock(&cli->cl_loi_list_lock);
1027         if (!(body->oa.o_valid & OBD_MD_FLFLAGS)) {
1028                 body->oa.o_valid |= OBD_MD_FLFLAGS;
1029                 body->oa.o_flags = 0;
1030         }
1031         body->oa.o_flags |= OBD_FL_SHRINK_GRANT;
1032         osc_update_next_shrink(cli);
1033
1034         rc = osc_set_info_async(cli->cl_import->imp_obd->obd_self_export,
1035                                 sizeof(KEY_GRANT_SHRINK), KEY_GRANT_SHRINK,
1036                                 sizeof(*body), body, NULL);
1037         if (rc != 0)
1038                 __osc_update_grant(cli, body->oa.o_grant);
1039         OBD_FREE_PTR(body);
1040         RETURN(rc);
1041 }
1042
1043 #define GRANT_SHRINK_LIMIT PTLRPC_MAX_BRW_SIZE
1044 static int osc_should_shrink_grant(struct client_obd *client)
1045 {
1046         cfs_time_t time = cfs_time_current();
1047         cfs_time_t next_shrink = client->cl_next_shrink_grant;
1048
1049         if ((client->cl_import->imp_connect_data.ocd_connect_flags &
1050              OBD_CONNECT_GRANT_SHRINK) == 0)
1051                 return 0;
1052
1053         if (cfs_time_aftereq(time, next_shrink - 5 * CFS_TICK)) {
1054                 if (client->cl_import->imp_state == LUSTRE_IMP_FULL &&
1055                     client->cl_avail_grant > GRANT_SHRINK_LIMIT)
1056                         return 1;
1057                 else
1058                         osc_update_next_shrink(client);
1059         }
1060         return 0;
1061 }
1062
1063 static int osc_grant_shrink_grant_cb(struct timeout_item *item, void *data)
1064 {
1065         struct client_obd *client;
1066
1067         cfs_list_for_each_entry(client, &item->ti_obd_list,
1068                                 cl_grant_shrink_list) {
1069                 if (osc_should_shrink_grant(client))
1070                         osc_shrink_grant(client);
1071         }
1072         return 0;
1073 }
1074
1075 static int osc_add_shrink_grant(struct client_obd *client)
1076 {
1077         int rc;
1078
1079         rc = ptlrpc_add_timeout_client(client->cl_grant_shrink_interval,
1080                                        TIMEOUT_GRANT,
1081                                        osc_grant_shrink_grant_cb, NULL,
1082                                        &client->cl_grant_shrink_list);
1083         if (rc) {
1084                 CERROR("add grant client %s error %d\n",
1085                         client->cl_import->imp_obd->obd_name, rc);
1086                 return rc;
1087         }
1088         CDEBUG(D_CACHE, "add grant client %s \n",
1089                client->cl_import->imp_obd->obd_name);
1090         osc_update_next_shrink(client);
1091         return 0;
1092 }
1093
1094 static int osc_del_shrink_grant(struct client_obd *client)
1095 {
1096         return ptlrpc_del_timeout_client(&client->cl_grant_shrink_list,
1097                                          TIMEOUT_GRANT);
1098 }
1099
1100 static void osc_init_grant(struct client_obd *cli, struct obd_connect_data *ocd)
1101 {
1102         /*
1103          * ocd_grant is the total grant amount we're expect to hold: if we've
1104          * been evicted, it's the new avail_grant amount, cl_dirty will drop
1105          * to 0 as inflight RPCs fail out; otherwise, it's avail_grant + dirty.
1106          *
1107          * race is tolerable here: if we're evicted, but imp_state already
1108          * left EVICTED state, then cl_dirty must be 0 already.
1109          */
1110         client_obd_list_lock(&cli->cl_loi_list_lock);
1111         if (cli->cl_import->imp_state == LUSTRE_IMP_EVICTED)
1112                 cli->cl_avail_grant = ocd->ocd_grant;
1113         else
1114                 cli->cl_avail_grant = ocd->ocd_grant - cli->cl_dirty;
1115
1116         if (cli->cl_avail_grant < 0) {
1117                 CWARN("%s: available grant < 0, the OSS is probably not running"
1118                       " with patch from bug20278 (%ld) \n",
1119                       cli->cl_import->imp_obd->obd_name, cli->cl_avail_grant);
1120                 /* workaround for 1.6 servers which do not have 
1121                  * the patch from bug20278 */
1122                 cli->cl_avail_grant = ocd->ocd_grant;
1123         }
1124
1125         client_obd_list_unlock(&cli->cl_loi_list_lock);
1126
1127         CDEBUG(D_CACHE, "%s, setting cl_avail_grant: %ld cl_lost_grant: %ld \n",
1128                cli->cl_import->imp_obd->obd_name,
1129                cli->cl_avail_grant, cli->cl_lost_grant);
1130
1131         if (ocd->ocd_connect_flags & OBD_CONNECT_GRANT_SHRINK &&
1132             cfs_list_empty(&cli->cl_grant_shrink_list))
1133                 osc_add_shrink_grant(cli);
1134 }
1135
1136 /* We assume that the reason this OSC got a short read is because it read
1137  * beyond the end of a stripe file; i.e. lustre is reading a sparse file
1138  * via the LOV, and it _knows_ it's reading inside the file, it's just that
1139  * this stripe never got written at or beyond this stripe offset yet. */
1140 static void handle_short_read(int nob_read, obd_count page_count,
1141                               struct brw_page **pga)
1142 {
1143         char *ptr;
1144         int i = 0;
1145
1146         /* skip bytes read OK */
1147         while (nob_read > 0) {
1148                 LASSERT (page_count > 0);
1149
1150                 if (pga[i]->count > nob_read) {
1151                         /* EOF inside this page */
1152                         ptr = cfs_kmap(pga[i]->pg) +
1153                                 (pga[i]->off & ~CFS_PAGE_MASK);
1154                         memset(ptr + nob_read, 0, pga[i]->count - nob_read);
1155                         cfs_kunmap(pga[i]->pg);
1156                         page_count--;
1157                         i++;
1158                         break;
1159                 }
1160
1161                 nob_read -= pga[i]->count;
1162                 page_count--;
1163                 i++;
1164         }
1165
1166         /* zero remaining pages */
1167         while (page_count-- > 0) {
1168                 ptr = cfs_kmap(pga[i]->pg) + (pga[i]->off & ~CFS_PAGE_MASK);
1169                 memset(ptr, 0, pga[i]->count);
1170                 cfs_kunmap(pga[i]->pg);
1171                 i++;
1172         }
1173 }
1174
1175 static int check_write_rcs(struct ptlrpc_request *req,
1176                            int requested_nob, int niocount,
1177                            obd_count page_count, struct brw_page **pga)
1178 {
1179         int     i;
1180         __u32   *remote_rcs;
1181
1182         remote_rcs = req_capsule_server_sized_get(&req->rq_pill, &RMF_RCS,
1183                                                   sizeof(*remote_rcs) *
1184                                                   niocount);
1185         if (remote_rcs == NULL) {
1186                 CDEBUG(D_INFO, "Missing/short RC vector on BRW_WRITE reply\n");
1187                 return(-EPROTO);
1188         }
1189
1190         /* return error if any niobuf was in error */
1191         for (i = 0; i < niocount; i++) {
1192                 if (remote_rcs[i] < 0)
1193                         return(remote_rcs[i]);
1194
1195                 if (remote_rcs[i] != 0) {
1196                         CDEBUG(D_INFO, "rc[%d] invalid (%d) req %p\n",
1197                                 i, remote_rcs[i], req);
1198                         return(-EPROTO);
1199                 }
1200         }
1201
1202         if (req->rq_bulk->bd_nob_transferred != requested_nob) {
1203                 CERROR("Unexpected # bytes transferred: %d (requested %d)\n",
1204                        req->rq_bulk->bd_nob_transferred, requested_nob);
1205                 return(-EPROTO);
1206         }
1207
1208         return (0);
1209 }
1210
1211 static inline int can_merge_pages(struct brw_page *p1, struct brw_page *p2)
1212 {
1213         if (p1->flag != p2->flag) {
1214                 unsigned mask = ~(OBD_BRW_FROM_GRANT|
1215                                   OBD_BRW_NOCACHE|OBD_BRW_SYNC|OBD_BRW_ASYNC);
1216
1217                 /* warn if we try to combine flags that we don't know to be
1218                  * safe to combine */
1219                 if ((p1->flag & mask) != (p2->flag & mask))
1220                         CERROR("is it ok to have flags 0x%x and 0x%x in the "
1221                                "same brw?\n", p1->flag, p2->flag);
1222                 return 0;
1223         }
1224
1225         return (p1->off + p1->count == p2->off);
1226 }
1227
1228 static obd_count osc_checksum_bulk(int nob, obd_count pg_count,
1229                                    struct brw_page **pga, int opc,
1230                                    cksum_type_t cksum_type)
1231 {
1232         __u32 cksum;
1233         int i = 0;
1234
1235         LASSERT (pg_count > 0);
1236         cksum = init_checksum(cksum_type);
1237         while (nob > 0 && pg_count > 0) {
1238                 unsigned char *ptr = cfs_kmap(pga[i]->pg);
1239                 int off = pga[i]->off & ~CFS_PAGE_MASK;
1240                 int count = pga[i]->count > nob ? nob : pga[i]->count;
1241
1242                 /* corrupt the data before we compute the checksum, to
1243                  * simulate an OST->client data error */
1244                 if (i == 0 && opc == OST_READ &&
1245                     OBD_FAIL_CHECK(OBD_FAIL_OSC_CHECKSUM_RECEIVE))
1246                         memcpy(ptr + off, "bad1", min(4, nob));
1247                 cksum = compute_checksum(cksum, ptr + off, count, cksum_type);
1248                 cfs_kunmap(pga[i]->pg);
1249                 LL_CDEBUG_PAGE(D_PAGE, pga[i]->pg, "off %d checksum %x\n",
1250                                off, cksum);
1251
1252                 nob -= pga[i]->count;
1253                 pg_count--;
1254                 i++;
1255         }
1256         /* For sending we only compute the wrong checksum instead
1257          * of corrupting the data so it is still correct on a redo */
1258         if (opc == OST_WRITE && OBD_FAIL_CHECK(OBD_FAIL_OSC_CHECKSUM_SEND))
1259                 cksum++;
1260
1261         return cksum;
1262 }
1263
1264 static int osc_brw_prep_request(int cmd, struct client_obd *cli,struct obdo *oa,
1265                                 struct lov_stripe_md *lsm, obd_count page_count,
1266                                 struct brw_page **pga,
1267                                 struct ptlrpc_request **reqp,
1268                                 struct obd_capa *ocapa, int reserve,
1269                                 int resend)
1270 {
1271         struct ptlrpc_request   *req;
1272         struct ptlrpc_bulk_desc *desc;
1273         struct ost_body         *body;
1274         struct obd_ioobj        *ioobj;
1275         struct niobuf_remote    *niobuf;
1276         int niocount, i, requested_nob, opc, rc;
1277         struct osc_brw_async_args *aa;
1278         struct req_capsule      *pill;
1279         struct brw_page *pg_prev;
1280
1281         ENTRY;
1282         if (OBD_FAIL_CHECK(OBD_FAIL_OSC_BRW_PREP_REQ))
1283                 RETURN(-ENOMEM); /* Recoverable */
1284         if (OBD_FAIL_CHECK(OBD_FAIL_OSC_BRW_PREP_REQ2))
1285                 RETURN(-EINVAL); /* Fatal */
1286
1287         if ((cmd & OBD_BRW_WRITE) != 0) {
1288                 opc = OST_WRITE;
1289                 req = ptlrpc_request_alloc_pool(cli->cl_import,
1290                                                 cli->cl_import->imp_rq_pool,
1291                                                 &RQF_OST_BRW_WRITE);
1292         } else {
1293                 opc = OST_READ;
1294                 req = ptlrpc_request_alloc(cli->cl_import, &RQF_OST_BRW_READ);
1295         }
1296         if (req == NULL)
1297                 RETURN(-ENOMEM);
1298
1299         for (niocount = i = 1; i < page_count; i++) {
1300                 if (!can_merge_pages(pga[i - 1], pga[i]))
1301                         niocount++;
1302         }
1303
1304         pill = &req->rq_pill;
1305         req_capsule_set_size(pill, &RMF_OBD_IOOBJ, RCL_CLIENT,
1306                              sizeof(*ioobj));
1307         req_capsule_set_size(pill, &RMF_NIOBUF_REMOTE, RCL_CLIENT,
1308                              niocount * sizeof(*niobuf));
1309         osc_set_capa_size(req, &RMF_CAPA1, ocapa);
1310
1311         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, opc);
1312         if (rc) {
1313                 ptlrpc_request_free(req);
1314                 RETURN(rc);
1315         }
1316         req->rq_request_portal = OST_IO_PORTAL; /* bug 7198 */
1317         ptlrpc_at_set_req_timeout(req);
1318
1319         if (opc == OST_WRITE)
1320                 desc = ptlrpc_prep_bulk_imp(req, page_count,
1321                                             BULK_GET_SOURCE, OST_BULK_PORTAL);
1322         else
1323                 desc = ptlrpc_prep_bulk_imp(req, page_count,
1324                                             BULK_PUT_SINK, OST_BULK_PORTAL);
1325
1326         if (desc == NULL)
1327                 GOTO(out, rc = -ENOMEM);
1328         /* NB request now owns desc and will free it when it gets freed */
1329
1330         body = req_capsule_client_get(pill, &RMF_OST_BODY);
1331         ioobj = req_capsule_client_get(pill, &RMF_OBD_IOOBJ);
1332         niobuf = req_capsule_client_get(pill, &RMF_NIOBUF_REMOTE);
1333         LASSERT(body != NULL && ioobj != NULL && niobuf != NULL);
1334
1335         lustre_set_wire_obdo(&body->oa, oa);
1336
1337         obdo_to_ioobj(oa, ioobj);
1338         ioobj->ioo_bufcnt = niocount;
1339         osc_pack_capa(req, body, ocapa);
1340         LASSERT (page_count > 0);
1341         pg_prev = pga[0];
1342         for (requested_nob = i = 0; i < page_count; i++, niobuf++) {
1343                 struct brw_page *pg = pga[i];
1344
1345                 LASSERT(pg->count > 0);
1346                 LASSERTF((pg->off & ~CFS_PAGE_MASK) + pg->count <= CFS_PAGE_SIZE,
1347                          "i: %d pg: %p off: "LPU64", count: %u\n", i, pg,
1348                          pg->off, pg->count);
1349 #ifdef __linux__
1350                 LASSERTF(i == 0 || pg->off > pg_prev->off,
1351                          "i %d p_c %u pg %p [pri %lu ind %lu] off "LPU64
1352                          " prev_pg %p [pri %lu ind %lu] off "LPU64"\n",
1353                          i, page_count,
1354                          pg->pg, page_private(pg->pg), pg->pg->index, pg->off,
1355                          pg_prev->pg, page_private(pg_prev->pg),
1356                          pg_prev->pg->index, pg_prev->off);
1357 #else
1358                 LASSERTF(i == 0 || pg->off > pg_prev->off,
1359                          "i %d p_c %u\n", i, page_count);
1360 #endif
1361                 LASSERT((pga[0]->flag & OBD_BRW_SRVLOCK) ==
1362                         (pg->flag & OBD_BRW_SRVLOCK));
1363
1364                 ptlrpc_prep_bulk_page(desc, pg->pg, pg->off & ~CFS_PAGE_MASK,
1365                                       pg->count);
1366                 requested_nob += pg->count;
1367
1368                 if (i > 0 && can_merge_pages(pg_prev, pg)) {
1369                         niobuf--;
1370                         niobuf->len += pg->count;
1371                 } else {
1372                         niobuf->offset = pg->off;
1373                         niobuf->len    = pg->count;
1374                         niobuf->flags  = pg->flag;
1375                 }
1376                 pg_prev = pg;
1377         }
1378
1379         LASSERTF((void *)(niobuf - niocount) ==
1380                 req_capsule_client_get(&req->rq_pill, &RMF_NIOBUF_REMOTE),
1381                 "want %p - real %p\n", req_capsule_client_get(&req->rq_pill,
1382                 &RMF_NIOBUF_REMOTE), (void *)(niobuf - niocount));
1383
1384         osc_announce_cached(cli, &body->oa, opc == OST_WRITE ? requested_nob:0);
1385         if (resend) {
1386                 if ((body->oa.o_valid & OBD_MD_FLFLAGS) == 0) {
1387                         body->oa.o_valid |= OBD_MD_FLFLAGS;
1388                         body->oa.o_flags = 0;
1389                 }
1390                 body->oa.o_flags |= OBD_FL_RECOV_RESEND;
1391         }
1392
1393         if (osc_should_shrink_grant(cli))
1394                 osc_shrink_grant_local(cli, &body->oa);
1395
1396         /* size[REQ_REC_OFF] still sizeof (*body) */
1397         if (opc == OST_WRITE) {
1398                 if (unlikely(cli->cl_checksum) &&
1399                     !sptlrpc_flavor_has_bulk(&req->rq_flvr)) {
1400                         /* store cl_cksum_type in a local variable since
1401                          * it can be changed via lprocfs */
1402                         cksum_type_t cksum_type = cli->cl_cksum_type;
1403
1404                         if ((body->oa.o_valid & OBD_MD_FLFLAGS) == 0) {
1405                                 oa->o_flags &= OBD_FL_LOCAL_MASK;
1406                                 body->oa.o_flags = 0;
1407                         }
1408                         body->oa.o_flags |= cksum_type_pack(cksum_type);
1409                         body->oa.o_valid |= OBD_MD_FLCKSUM | OBD_MD_FLFLAGS;
1410                         body->oa.o_cksum = osc_checksum_bulk(requested_nob,
1411                                                              page_count, pga,
1412                                                              OST_WRITE,
1413                                                              cksum_type);
1414                         CDEBUG(D_PAGE, "checksum at write origin: %x\n",
1415                                body->oa.o_cksum);
1416                         /* save this in 'oa', too, for later checking */
1417                         oa->o_valid |= OBD_MD_FLCKSUM | OBD_MD_FLFLAGS;
1418                         oa->o_flags |= cksum_type_pack(cksum_type);
1419                 } else {
1420                         /* clear out the checksum flag, in case this is a
1421                          * resend but cl_checksum is no longer set. b=11238 */
1422                         oa->o_valid &= ~OBD_MD_FLCKSUM;
1423                 }
1424                 oa->o_cksum = body->oa.o_cksum;
1425                 /* 1 RC per niobuf */
1426                 req_capsule_set_size(pill, &RMF_RCS, RCL_SERVER,
1427                                      sizeof(__u32) * niocount);
1428         } else {
1429                 if (unlikely(cli->cl_checksum) &&
1430                     !sptlrpc_flavor_has_bulk(&req->rq_flvr)) {
1431                         if ((body->oa.o_valid & OBD_MD_FLFLAGS) == 0)
1432                                 body->oa.o_flags = 0;
1433                         body->oa.o_flags |= cksum_type_pack(cli->cl_cksum_type);
1434                         body->oa.o_valid |= OBD_MD_FLCKSUM | OBD_MD_FLFLAGS;
1435                 }
1436         }
1437         ptlrpc_request_set_replen(req);
1438
1439         CLASSERT(sizeof(*aa) <= sizeof(req->rq_async_args));
1440         aa = ptlrpc_req_async_args(req);
1441         aa->aa_oa = oa;
1442         aa->aa_requested_nob = requested_nob;
1443         aa->aa_nio_count = niocount;
1444         aa->aa_page_count = page_count;
1445         aa->aa_resends = 0;
1446         aa->aa_ppga = pga;
1447         aa->aa_cli = cli;
1448         CFS_INIT_LIST_HEAD(&aa->aa_oaps);
1449         if (ocapa && reserve)
1450                 aa->aa_ocapa = capa_get(ocapa);
1451
1452         *reqp = req;
1453         RETURN(0);
1454
1455  out:
1456         ptlrpc_req_finished(req);
1457         RETURN(rc);
1458 }
1459
1460 static int check_write_checksum(struct obdo *oa, const lnet_process_id_t *peer,
1461                                 __u32 client_cksum, __u32 server_cksum, int nob,
1462                                 obd_count page_count, struct brw_page **pga,
1463                                 cksum_type_t client_cksum_type)
1464 {
1465         __u32 new_cksum;
1466         char *msg;
1467         cksum_type_t cksum_type;
1468
1469         if (server_cksum == client_cksum) {
1470                 CDEBUG(D_PAGE, "checksum %x confirmed\n", client_cksum);
1471                 return 0;
1472         }
1473
1474         /* If this is mmaped file - it can be changed at any time */
1475         if (oa->o_valid & OBD_MD_FLFLAGS && oa->o_flags & OBD_FL_MMAP)
1476                 return 1;
1477
1478         if (oa->o_valid & OBD_MD_FLFLAGS)
1479                 cksum_type = cksum_type_unpack(oa->o_flags);
1480         else
1481                 cksum_type = OBD_CKSUM_CRC32;
1482
1483         new_cksum = osc_checksum_bulk(nob, page_count, pga, OST_WRITE,
1484                                       cksum_type);
1485
1486         if (cksum_type != client_cksum_type)
1487                 msg = "the server did not use the checksum type specified in "
1488                       "the original request - likely a protocol problem";
1489         else if (new_cksum == server_cksum)
1490                 msg = "changed on the client after we checksummed it - "
1491                       "likely false positive due to mmap IO (bug 11742)";
1492         else if (new_cksum == client_cksum)
1493                 msg = "changed in transit before arrival at OST";
1494         else
1495                 msg = "changed in transit AND doesn't match the original - "
1496                       "likely false positive due to mmap IO (bug 11742)";
1497
1498         LCONSOLE_ERROR_MSG(0x132, "BAD WRITE CHECKSUM: %s: from %s inode "DFID
1499                            " object "LPU64"/"LPU64" extent ["LPU64"-"LPU64"]\n",
1500                            msg, libcfs_nid2str(peer->nid),
1501                            oa->o_valid & OBD_MD_FLFID ? oa->o_parent_seq : (__u64)0,
1502                            oa->o_valid & OBD_MD_FLFID ? oa->o_parent_oid : 0,
1503                            oa->o_valid & OBD_MD_FLFID ? oa->o_parent_ver : 0,
1504                            oa->o_id,
1505                            oa->o_valid & OBD_MD_FLGROUP ? oa->o_seq : (__u64)0,
1506                            pga[0]->off,
1507                            pga[page_count-1]->off + pga[page_count-1]->count - 1);
1508         CERROR("original client csum %x (type %x), server csum %x (type %x), "
1509                "client csum now %x\n", client_cksum, client_cksum_type,
1510                server_cksum, cksum_type, new_cksum);
1511         return 1;
1512 }
1513
1514 /* Note rc enters this function as number of bytes transferred */
1515 static int osc_brw_fini_request(struct ptlrpc_request *req, int rc)
1516 {
1517         struct osc_brw_async_args *aa = (void *)&req->rq_async_args;
1518         const lnet_process_id_t *peer =
1519                         &req->rq_import->imp_connection->c_peer;
1520         struct client_obd *cli = aa->aa_cli;
1521         struct ost_body *body;
1522         __u32 client_cksum = 0;
1523         ENTRY;
1524
1525         if (rc < 0 && rc != -EDQUOT) {
1526                 DEBUG_REQ(D_INFO, req, "Failed request with rc = %d\n", rc);
1527                 RETURN(rc);
1528         }
1529
1530         LASSERTF(req->rq_repmsg != NULL, "rc = %d\n", rc);
1531         body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
1532         if (body == NULL) {
1533                 DEBUG_REQ(D_INFO, req, "Can't unpack body\n");
1534                 RETURN(-EPROTO);
1535         }
1536
1537 #ifdef HAVE_QUOTA_SUPPORT
1538         /* set/clear over quota flag for a uid/gid */
1539         if (lustre_msg_get_opc(req->rq_reqmsg) == OST_WRITE &&
1540             body->oa.o_valid & (OBD_MD_FLUSRQUOTA | OBD_MD_FLGRPQUOTA)) {
1541                 unsigned int qid[MAXQUOTAS] = { body->oa.o_uid, body->oa.o_gid };
1542
1543                 CDEBUG(D_QUOTA, "setdq for [%u %u] with valid "LPX64", flags %x\n",
1544                        body->oa.o_uid, body->oa.o_gid, body->oa.o_valid,
1545                        body->oa.o_flags);
1546                 lquota_setdq(quota_interface, cli, qid, body->oa.o_valid,
1547                              body->oa.o_flags);
1548         }
1549 #endif
1550
1551         osc_update_grant(cli, body);
1552
1553         if (rc < 0)
1554                 RETURN(rc);
1555
1556         if (aa->aa_oa->o_valid & OBD_MD_FLCKSUM)
1557                 client_cksum = aa->aa_oa->o_cksum; /* save for later */
1558
1559         if (lustre_msg_get_opc(req->rq_reqmsg) == OST_WRITE) {
1560                 if (rc > 0) {
1561                         CERROR("Unexpected +ve rc %d\n", rc);
1562                         RETURN(-EPROTO);
1563                 }
1564                 LASSERT(req->rq_bulk->bd_nob == aa->aa_requested_nob);
1565
1566                 if (sptlrpc_cli_unwrap_bulk_write(req, req->rq_bulk))
1567                         RETURN(-EAGAIN);
1568
1569                 if ((aa->aa_oa->o_valid & OBD_MD_FLCKSUM) && client_cksum &&
1570                     check_write_checksum(&body->oa, peer, client_cksum,
1571                                          body->oa.o_cksum, aa->aa_requested_nob,
1572                                          aa->aa_page_count, aa->aa_ppga,
1573                                          cksum_type_unpack(aa->aa_oa->o_flags)))
1574                         RETURN(-EAGAIN);
1575
1576                 rc = check_write_rcs(req, aa->aa_requested_nob,aa->aa_nio_count,
1577                                      aa->aa_page_count, aa->aa_ppga);
1578                 GOTO(out, rc);
1579         }
1580
1581         /* The rest of this function executes only for OST_READs */
1582
1583         /* if unwrap_bulk failed, return -EAGAIN to retry */
1584         rc = sptlrpc_cli_unwrap_bulk_read(req, req->rq_bulk, rc);
1585         if (rc < 0)
1586                 GOTO(out, rc = -EAGAIN);
1587
1588         if (rc > aa->aa_requested_nob) {
1589                 CERROR("Unexpected rc %d (%d requested)\n", rc,
1590                        aa->aa_requested_nob);
1591                 RETURN(-EPROTO);
1592         }
1593
1594         if (rc != req->rq_bulk->bd_nob_transferred) {
1595                 CERROR ("Unexpected rc %d (%d transferred)\n",
1596                         rc, req->rq_bulk->bd_nob_transferred);
1597                 return (-EPROTO);
1598         }
1599
1600         if (rc < aa->aa_requested_nob)
1601                 handle_short_read(rc, aa->aa_page_count, aa->aa_ppga);
1602
1603         if (body->oa.o_valid & OBD_MD_FLCKSUM) {
1604                 static int cksum_counter;
1605                 __u32      server_cksum = body->oa.o_cksum;
1606                 char      *via;
1607                 char      *router;
1608                 cksum_type_t cksum_type;
1609
1610                 if (body->oa.o_valid & OBD_MD_FLFLAGS)
1611                         cksum_type = cksum_type_unpack(body->oa.o_flags);
1612                 else
1613                         cksum_type = OBD_CKSUM_CRC32;
1614                 client_cksum = osc_checksum_bulk(rc, aa->aa_page_count,
1615                                                  aa->aa_ppga, OST_READ,
1616                                                  cksum_type);
1617
1618                 if (peer->nid == req->rq_bulk->bd_sender) {
1619                         via = router = "";
1620                 } else {
1621                         via = " via ";
1622                         router = libcfs_nid2str(req->rq_bulk->bd_sender);
1623                 }
1624
1625                 if (server_cksum == ~0 && rc > 0) {
1626                         CERROR("Protocol error: server %s set the 'checksum' "
1627                                "bit, but didn't send a checksum.  Not fatal, "
1628                                "but please notify on http://bugzilla.lustre.org/\n",
1629                                libcfs_nid2str(peer->nid));
1630                 } else if (server_cksum != client_cksum) {
1631                         LCONSOLE_ERROR_MSG(0x133, "%s: BAD READ CHECKSUM: from "
1632                                            "%s%s%s inode "DFID" object "
1633                                            LPU64"/"LPU64" extent "
1634                                            "["LPU64"-"LPU64"]\n",
1635                                            req->rq_import->imp_obd->obd_name,
1636                                            libcfs_nid2str(peer->nid),
1637                                            via, router,
1638                                            body->oa.o_valid & OBD_MD_FLFID ?
1639                                                 body->oa.o_parent_seq : (__u64)0,
1640                                            body->oa.o_valid & OBD_MD_FLFID ?
1641                                                 body->oa.o_parent_oid : 0,
1642                                            body->oa.o_valid & OBD_MD_FLFID ?
1643                                                 body->oa.o_parent_ver : 0,
1644                                            body->oa.o_id,
1645                                            body->oa.o_valid & OBD_MD_FLGROUP ?
1646                                                 body->oa.o_seq : (__u64)0,
1647                                            aa->aa_ppga[0]->off,
1648                                            aa->aa_ppga[aa->aa_page_count-1]->off +
1649                                            aa->aa_ppga[aa->aa_page_count-1]->count -
1650                                                                         1);
1651                         CERROR("client %x, server %x, cksum_type %x\n",
1652                                client_cksum, server_cksum, cksum_type);
1653                         cksum_counter = 0;
1654                         aa->aa_oa->o_cksum = client_cksum;
1655                         rc = -EAGAIN;
1656                 } else {
1657                         cksum_counter++;
1658                         CDEBUG(D_PAGE, "checksum %x confirmed\n", client_cksum);
1659                         rc = 0;
1660                 }
1661         } else if (unlikely(client_cksum)) {
1662                 static int cksum_missed;
1663
1664                 cksum_missed++;
1665                 if ((cksum_missed & (-cksum_missed)) == cksum_missed)
1666                         CERROR("Checksum %u requested from %s but not sent\n",
1667                                cksum_missed, libcfs_nid2str(peer->nid));
1668         } else {
1669                 rc = 0;
1670         }
1671 out:
1672         if (rc >= 0)
1673                 lustre_get_wire_obdo(aa->aa_oa, &body->oa);
1674
1675         RETURN(rc);
1676 }
1677
1678 static int osc_brw_internal(int cmd, struct obd_export *exp, struct obdo *oa,
1679                             struct lov_stripe_md *lsm,
1680                             obd_count page_count, struct brw_page **pga,
1681                             struct obd_capa *ocapa)
1682 {
1683         struct ptlrpc_request *req;
1684         int                    rc;
1685         cfs_waitq_t            waitq;
1686         int                    resends = 0;
1687         struct l_wait_info     lwi;
1688
1689         ENTRY;
1690
1691         cfs_waitq_init(&waitq);
1692
1693 restart_bulk:
1694         rc = osc_brw_prep_request(cmd, &exp->exp_obd->u.cli, oa, lsm,
1695                                   page_count, pga, &req, ocapa, 0, resends);
1696         if (rc != 0)
1697                 return (rc);
1698
1699         rc = ptlrpc_queue_wait(req);
1700
1701         if (rc == -ETIMEDOUT && req->rq_resend) {
1702                 DEBUG_REQ(D_HA, req,  "BULK TIMEOUT");
1703                 ptlrpc_req_finished(req);
1704                 goto restart_bulk;
1705         }
1706
1707         rc = osc_brw_fini_request(req, rc);
1708
1709         ptlrpc_req_finished(req);
1710         if (osc_recoverable_error(rc)) {
1711                 resends++;
1712                 if (!osc_should_resend(resends, &exp->exp_obd->u.cli)) {
1713                         CERROR("too many resend retries, returning error\n");
1714                         RETURN(-EIO);
1715                 }
1716
1717                 lwi = LWI_TIMEOUT_INTR(cfs_time_seconds(resends), NULL, NULL, NULL);
1718                 l_wait_event(waitq, 0, &lwi);
1719
1720                 goto restart_bulk;
1721         }
1722
1723         RETURN (rc);
1724 }
1725
1726 int osc_brw_redo_request(struct ptlrpc_request *request,
1727                          struct osc_brw_async_args *aa)
1728 {
1729         struct ptlrpc_request *new_req;
1730         struct ptlrpc_request_set *set = request->rq_set;
1731         struct osc_brw_async_args *new_aa;
1732         struct osc_async_page *oap;
1733         int rc = 0;
1734         ENTRY;
1735
1736         if (!osc_should_resend(aa->aa_resends, aa->aa_cli)) {
1737                 CERROR("too many resent retries, returning error\n");
1738                 RETURN(-EIO);
1739         }
1740
1741         DEBUG_REQ(D_ERROR, request, "redo for recoverable error");
1742
1743         rc = osc_brw_prep_request(lustre_msg_get_opc(request->rq_reqmsg) ==
1744                                         OST_WRITE ? OBD_BRW_WRITE :OBD_BRW_READ,
1745                                   aa->aa_cli, aa->aa_oa,
1746                                   NULL /* lsm unused by osc currently */,
1747                                   aa->aa_page_count, aa->aa_ppga,
1748                                   &new_req, aa->aa_ocapa, 0, 1);
1749         if (rc)
1750                 RETURN(rc);
1751
1752         client_obd_list_lock(&aa->aa_cli->cl_loi_list_lock);
1753
1754         cfs_list_for_each_entry(oap, &aa->aa_oaps, oap_rpc_item) {
1755                 if (oap->oap_request != NULL) {
1756                         LASSERTF(request == oap->oap_request,
1757                                  "request %p != oap_request %p\n",
1758                                  request, oap->oap_request);
1759                         if (oap->oap_interrupted) {
1760                                 client_obd_list_unlock(&aa->aa_cli->cl_loi_list_lock);
1761                                 ptlrpc_req_finished(new_req);
1762                                 RETURN(-EINTR);
1763                         }
1764                 }
1765         }
1766         /* New request takes over pga and oaps from old request.
1767          * Note that copying a list_head doesn't work, need to move it... */
1768         aa->aa_resends++;
1769         new_req->rq_interpret_reply = request->rq_interpret_reply;
1770         new_req->rq_async_args = request->rq_async_args;
1771         new_req->rq_sent = cfs_time_current_sec() + aa->aa_resends;
1772
1773         new_aa = ptlrpc_req_async_args(new_req);
1774
1775         CFS_INIT_LIST_HEAD(&new_aa->aa_oaps);
1776         cfs_list_splice(&aa->aa_oaps, &new_aa->aa_oaps);
1777         CFS_INIT_LIST_HEAD(&aa->aa_oaps);
1778
1779         cfs_list_for_each_entry(oap, &new_aa->aa_oaps, oap_rpc_item) {
1780                 if (oap->oap_request) {
1781                         ptlrpc_req_finished(oap->oap_request);
1782                         oap->oap_request = ptlrpc_request_addref(new_req);
1783                 }
1784         }
1785
1786         new_aa->aa_ocapa = aa->aa_ocapa;
1787         aa->aa_ocapa = NULL;
1788
1789         /* use ptlrpc_set_add_req is safe because interpret functions work
1790          * in check_set context. only one way exist with access to request
1791          * from different thread got -EINTR - this way protected with
1792          * cl_loi_list_lock */
1793         ptlrpc_set_add_req(set, new_req);
1794
1795         client_obd_list_unlock(&aa->aa_cli->cl_loi_list_lock);
1796
1797         DEBUG_REQ(D_INFO, new_req, "new request");
1798         RETURN(0);
1799 }
1800
1801 /*
1802  * ugh, we want disk allocation on the target to happen in offset order.  we'll
1803  * follow sedgewicks advice and stick to the dead simple shellsort -- it'll do
1804  * fine for our small page arrays and doesn't require allocation.  its an
1805  * insertion sort that swaps elements that are strides apart, shrinking the
1806  * stride down until its '1' and the array is sorted.
1807  */
1808 static void sort_brw_pages(struct brw_page **array, int num)
1809 {
1810         int stride, i, j;
1811         struct brw_page *tmp;
1812
1813         if (num == 1)
1814                 return;
1815         for (stride = 1; stride < num ; stride = (stride * 3) + 1)
1816                 ;
1817
1818         do {
1819                 stride /= 3;
1820                 for (i = stride ; i < num ; i++) {
1821                         tmp = array[i];
1822                         j = i;
1823                         while (j >= stride && array[j - stride]->off > tmp->off) {
1824                                 array[j] = array[j - stride];
1825                                 j -= stride;
1826                         }
1827                         array[j] = tmp;
1828                 }
1829         } while (stride > 1);
1830 }
1831
1832 static obd_count max_unfragmented_pages(struct brw_page **pg, obd_count pages)
1833 {
1834         int count = 1;
1835         int offset;
1836         int i = 0;
1837
1838         LASSERT (pages > 0);
1839         offset = pg[i]->off & ~CFS_PAGE_MASK;
1840
1841         for (;;) {
1842                 pages--;
1843                 if (pages == 0)         /* that's all */
1844                         return count;
1845
1846                 if (offset + pg[i]->count < CFS_PAGE_SIZE)
1847                         return count;   /* doesn't end on page boundary */
1848
1849                 i++;
1850                 offset = pg[i]->off & ~CFS_PAGE_MASK;
1851                 if (offset != 0)        /* doesn't start on page boundary */
1852                         return count;
1853
1854                 count++;
1855         }
1856 }
1857
1858 static struct brw_page **osc_build_ppga(struct brw_page *pga, obd_count count)
1859 {
1860         struct brw_page **ppga;
1861         int i;
1862
1863         OBD_ALLOC(ppga, sizeof(*ppga) * count);
1864         if (ppga == NULL)
1865                 return NULL;
1866
1867         for (i = 0; i < count; i++)
1868                 ppga[i] = pga + i;
1869         return ppga;
1870 }
1871
1872 static void osc_release_ppga(struct brw_page **ppga, obd_count count)
1873 {
1874         LASSERT(ppga != NULL);
1875         OBD_FREE(ppga, sizeof(*ppga) * count);
1876 }
1877
1878 static int osc_brw(int cmd, struct obd_export *exp, struct obd_info *oinfo,
1879                    obd_count page_count, struct brw_page *pga,
1880                    struct obd_trans_info *oti)
1881 {
1882         struct obdo *saved_oa = NULL;
1883         struct brw_page **ppga, **orig;
1884         struct obd_import *imp = class_exp2cliimp(exp);
1885         struct client_obd *cli;
1886         int rc, page_count_orig;
1887         ENTRY;
1888
1889         LASSERT((imp != NULL) && (imp->imp_obd != NULL));
1890         cli = &imp->imp_obd->u.cli;
1891
1892         if (cmd & OBD_BRW_CHECK) {
1893                 /* The caller just wants to know if there's a chance that this
1894                  * I/O can succeed */
1895
1896                 if (imp->imp_invalid)
1897                         RETURN(-EIO);
1898                 RETURN(0);
1899         }
1900
1901         /* test_brw with a failed create can trip this, maybe others. */
1902         LASSERT(cli->cl_max_pages_per_rpc);
1903
1904         rc = 0;
1905
1906         orig = ppga = osc_build_ppga(pga, page_count);
1907         if (ppga == NULL)
1908                 RETURN(-ENOMEM);
1909         page_count_orig = page_count;
1910
1911         sort_brw_pages(ppga, page_count);
1912         while (page_count) {
1913                 obd_count pages_per_brw;
1914
1915                 if (page_count > cli->cl_max_pages_per_rpc)
1916                         pages_per_brw = cli->cl_max_pages_per_rpc;
1917                 else
1918                         pages_per_brw = page_count;
1919
1920                 pages_per_brw = max_unfragmented_pages(ppga, pages_per_brw);
1921
1922                 if (saved_oa != NULL) {
1923                         /* restore previously saved oa */
1924                         *oinfo->oi_oa = *saved_oa;
1925                 } else if (page_count > pages_per_brw) {
1926                         /* save a copy of oa (brw will clobber it) */
1927                         OBDO_ALLOC(saved_oa);
1928                         if (saved_oa == NULL)
1929                                 GOTO(out, rc = -ENOMEM);
1930                         *saved_oa = *oinfo->oi_oa;
1931                 }
1932
1933                 rc = osc_brw_internal(cmd, exp, oinfo->oi_oa, oinfo->oi_md,
1934                                       pages_per_brw, ppga, oinfo->oi_capa);
1935
1936                 if (rc != 0)
1937                         break;
1938
1939                 page_count -= pages_per_brw;
1940                 ppga += pages_per_brw;
1941         }
1942
1943 out:
1944         osc_release_ppga(orig, page_count_orig);
1945
1946         if (saved_oa != NULL)
1947                 OBDO_FREE(saved_oa);
1948
1949         RETURN(rc);
1950 }
1951
1952 /* The companion to osc_enter_cache(), called when @oap is no longer part of
1953  * the dirty accounting.  Writeback completes or truncate happens before
1954  * writing starts.  Must be called with the loi lock held. */
1955 static void osc_exit_cache(struct client_obd *cli, struct osc_async_page *oap,
1956                            int sent)
1957 {
1958         osc_release_write_grant(cli, &oap->oap_brw_page, sent);
1959 }
1960
1961
1962 /* This maintains the lists of pending pages to read/write for a given object
1963  * (lop).  This is used by osc_check_rpcs->osc_next_loi() and loi_list_maint()
1964  * to quickly find objects that are ready to send an RPC. */
1965 static int lop_makes_rpc(struct client_obd *cli, struct loi_oap_pages *lop,
1966                          int cmd)
1967 {
1968         int optimal;
1969         ENTRY;
1970
1971         if (lop->lop_num_pending == 0)
1972                 RETURN(0);
1973
1974         /* if we have an invalid import we want to drain the queued pages
1975          * by forcing them through rpcs that immediately fail and complete
1976          * the pages.  recovery relies on this to empty the queued pages
1977          * before canceling the locks and evicting down the llite pages */
1978         if (cli->cl_import == NULL || cli->cl_import->imp_invalid)
1979                 RETURN(1);
1980
1981         /* stream rpcs in queue order as long as as there is an urgent page
1982          * queued.  this is our cheap solution for good batching in the case
1983          * where writepage marks some random page in the middle of the file
1984          * as urgent because of, say, memory pressure */
1985         if (!cfs_list_empty(&lop->lop_urgent)) {
1986                 CDEBUG(D_CACHE, "urgent request forcing RPC\n");
1987                 RETURN(1);
1988         }
1989         /* fire off rpcs when we have 'optimal' rpcs as tuned for the wire. */
1990         optimal = cli->cl_max_pages_per_rpc;
1991         if (cmd & OBD_BRW_WRITE) {
1992                 /* trigger a write rpc stream as long as there are dirtiers
1993                  * waiting for space.  as they're waiting, they're not going to
1994                  * create more pages to coalesce with what's waiting.. */
1995                 if (!cfs_list_empty(&cli->cl_cache_waiters)) {
1996                         CDEBUG(D_CACHE, "cache waiters forcing RPC\n");
1997                         RETURN(1);
1998                 }
1999                 /* +16 to avoid triggering rpcs that would want to include pages
2000                  * that are being queued but which can't be made ready until
2001                  * the queuer finishes with the page. this is a wart for
2002                  * llite::commit_write() */
2003                 optimal += 16;
2004         }
2005         if (lop->lop_num_pending >= optimal)
2006                 RETURN(1);
2007
2008         RETURN(0);
2009 }
2010
2011 static int lop_makes_hprpc(struct loi_oap_pages *lop)
2012 {
2013         struct osc_async_page *oap;
2014         ENTRY;
2015
2016         if (cfs_list_empty(&lop->lop_urgent))
2017                 RETURN(0);
2018
2019         oap = cfs_list_entry(lop->lop_urgent.next,
2020                          struct osc_async_page, oap_urgent_item);
2021
2022         if (oap->oap_async_flags & ASYNC_HP) {
2023                 CDEBUG(D_CACHE, "hp request forcing RPC\n");
2024                 RETURN(1);
2025         }
2026
2027         RETURN(0);
2028 }
2029
2030 static void on_list(cfs_list_t *item, cfs_list_t *list,
2031                     int should_be_on)
2032 {
2033         if (cfs_list_empty(item) && should_be_on)
2034                 cfs_list_add_tail(item, list);
2035         else if (!cfs_list_empty(item) && !should_be_on)
2036                 cfs_list_del_init(item);
2037 }
2038
2039 /* maintain the loi's cli list membership invariants so that osc_send_oap_rpc
2040  * can find pages to build into rpcs quickly */
2041 void loi_list_maint(struct client_obd *cli, struct lov_oinfo *loi)
2042 {
2043         if (lop_makes_hprpc(&loi->loi_write_lop) ||
2044             lop_makes_hprpc(&loi->loi_read_lop)) {
2045                 /* HP rpc */
2046                 on_list(&loi->loi_ready_item, &cli->cl_loi_ready_list, 0);
2047                 on_list(&loi->loi_hp_ready_item, &cli->cl_loi_hp_ready_list, 1);
2048         } else {
2049                 on_list(&loi->loi_hp_ready_item, &cli->cl_loi_hp_ready_list, 0);
2050                 on_list(&loi->loi_ready_item, &cli->cl_loi_ready_list,
2051                         lop_makes_rpc(cli, &loi->loi_write_lop, OBD_BRW_WRITE)||
2052                         lop_makes_rpc(cli, &loi->loi_read_lop, OBD_BRW_READ));
2053         }
2054
2055         on_list(&loi->loi_write_item, &cli->cl_loi_write_list,
2056                 loi->loi_write_lop.lop_num_pending);
2057
2058         on_list(&loi->loi_read_item, &cli->cl_loi_read_list,
2059                 loi->loi_read_lop.lop_num_pending);
2060 }
2061
2062 static void lop_update_pending(struct client_obd *cli,
2063                                struct loi_oap_pages *lop, int cmd, int delta)
2064 {
2065         lop->lop_num_pending += delta;
2066         if (cmd & OBD_BRW_WRITE)
2067                 cli->cl_pending_w_pages += delta;
2068         else
2069                 cli->cl_pending_r_pages += delta;
2070 }
2071
2072 /**
2073  * this is called when a sync waiter receives an interruption.  Its job is to
2074  * get the caller woken as soon as possible.  If its page hasn't been put in an
2075  * rpc yet it can dequeue immediately.  Otherwise it has to mark the rpc as
2076  * desiring interruption which will forcefully complete the rpc once the rpc
2077  * has timed out.
2078  */
2079 int osc_oap_interrupted(const struct lu_env *env, struct osc_async_page *oap)
2080 {
2081         struct loi_oap_pages *lop;
2082         struct lov_oinfo *loi;
2083         int rc = -EBUSY;
2084         ENTRY;
2085
2086         LASSERT(!oap->oap_interrupted);
2087         oap->oap_interrupted = 1;
2088
2089         /* ok, it's been put in an rpc. only one oap gets a request reference */
2090         if (oap->oap_request != NULL) {
2091                 ptlrpc_mark_interrupted(oap->oap_request);
2092                 ptlrpcd_wake(oap->oap_request);
2093                 ptlrpc_req_finished(oap->oap_request);
2094                 oap->oap_request = NULL;
2095         }
2096
2097         /*
2098          * page completion may be called only if ->cpo_prep() method was
2099          * executed by osc_io_submit(), that also adds page the to pending list
2100          */
2101         if (!cfs_list_empty(&oap->oap_pending_item)) {
2102                 cfs_list_del_init(&oap->oap_pending_item);
2103                 cfs_list_del_init(&oap->oap_urgent_item);
2104
2105                 loi = oap->oap_loi;
2106                 lop = (oap->oap_cmd & OBD_BRW_WRITE) ?
2107                         &loi->loi_write_lop : &loi->loi_read_lop;
2108                 lop_update_pending(oap->oap_cli, lop, oap->oap_cmd, -1);
2109                 loi_list_maint(oap->oap_cli, oap->oap_loi);
2110                 rc = oap->oap_caller_ops->ap_completion(env,
2111                                           oap->oap_caller_data,
2112                                           oap->oap_cmd, NULL, -EINTR);
2113         }
2114
2115         RETURN(rc);
2116 }
2117
2118 /* this is trying to propogate async writeback errors back up to the
2119  * application.  As an async write fails we record the error code for later if
2120  * the app does an fsync.  As long as errors persist we force future rpcs to be
2121  * sync so that the app can get a sync error and break the cycle of queueing
2122  * pages for which writeback will fail. */
2123 static void osc_process_ar(struct osc_async_rc *ar, __u64 xid,
2124                            int rc)
2125 {
2126         if (rc) {
2127                 if (!ar->ar_rc)
2128                         ar->ar_rc = rc;
2129
2130                 ar->ar_force_sync = 1;
2131                 ar->ar_min_xid = ptlrpc_sample_next_xid();
2132                 return;
2133
2134         }
2135
2136         if (ar->ar_force_sync && (xid >= ar->ar_min_xid))
2137                 ar->ar_force_sync = 0;
2138 }
2139
2140 void osc_oap_to_pending(struct osc_async_page *oap)
2141 {
2142         struct loi_oap_pages *lop;
2143
2144         if (oap->oap_cmd & OBD_BRW_WRITE)
2145                 lop = &oap->oap_loi->loi_write_lop;
2146         else
2147                 lop = &oap->oap_loi->loi_read_lop;
2148
2149         if (oap->oap_async_flags & ASYNC_HP)
2150                 cfs_list_add(&oap->oap_urgent_item, &lop->lop_urgent);
2151         else if (oap->oap_async_flags & ASYNC_URGENT)
2152                 cfs_list_add_tail(&oap->oap_urgent_item, &lop->lop_urgent);
2153         cfs_list_add_tail(&oap->oap_pending_item, &lop->lop_pending);
2154         lop_update_pending(oap->oap_cli, lop, oap->oap_cmd, 1);
2155 }
2156
2157 /* this must be called holding the loi list lock to give coverage to exit_cache,
2158  * async_flag maintenance, and oap_request */
2159 static void osc_ap_completion(const struct lu_env *env,
2160                               struct client_obd *cli, struct obdo *oa,
2161                               struct osc_async_page *oap, int sent, int rc)
2162 {
2163         __u64 xid = 0;
2164
2165         ENTRY;
2166         if (oap->oap_request != NULL) {
2167                 xid = ptlrpc_req_xid(oap->oap_request);
2168                 ptlrpc_req_finished(oap->oap_request);
2169                 oap->oap_request = NULL;
2170         }
2171
2172         cfs_spin_lock(&oap->oap_lock);
2173         oap->oap_async_flags = 0;
2174         cfs_spin_unlock(&oap->oap_lock);
2175         oap->oap_interrupted = 0;
2176
2177         if (oap->oap_cmd & OBD_BRW_WRITE) {
2178                 osc_process_ar(&cli->cl_ar, xid, rc);
2179                 osc_process_ar(&oap->oap_loi->loi_ar, xid, rc);
2180         }
2181
2182         if (rc == 0 && oa != NULL) {
2183                 if (oa->o_valid & OBD_MD_FLBLOCKS)
2184                         oap->oap_loi->loi_lvb.lvb_blocks = oa->o_blocks;
2185                 if (oa->o_valid & OBD_MD_FLMTIME)
2186                         oap->oap_loi->loi_lvb.lvb_mtime = oa->o_mtime;
2187                 if (oa->o_valid & OBD_MD_FLATIME)
2188                         oap->oap_loi->loi_lvb.lvb_atime = oa->o_atime;
2189                 if (oa->o_valid & OBD_MD_FLCTIME)
2190                         oap->oap_loi->loi_lvb.lvb_ctime = oa->o_ctime;
2191         }
2192
2193         rc = oap->oap_caller_ops->ap_completion(env, oap->oap_caller_data,
2194                                                 oap->oap_cmd, oa, rc);
2195
2196         /* ll_ap_completion (from llite) drops PG_locked. so, a new
2197          * I/O on the page could start, but OSC calls it under lock
2198          * and thus we can add oap back to pending safely */
2199         if (rc)
2200                 /* upper layer wants to leave the page on pending queue */
2201                 osc_oap_to_pending(oap);
2202         else
2203                 osc_exit_cache(cli, oap, sent);
2204         EXIT;
2205 }
2206
2207 static int brw_interpret(const struct lu_env *env,
2208                          struct ptlrpc_request *req, void *data, int rc)
2209 {
2210         struct osc_brw_async_args *aa = data;
2211         struct client_obd *cli;
2212         int async;
2213         ENTRY;
2214
2215         rc = osc_brw_fini_request(req, rc);
2216         CDEBUG(D_INODE, "request %p aa %p rc %d\n", req, aa, rc);
2217         if (osc_recoverable_error(rc)) {
2218                 /* Only retry once for mmaped files since the mmaped page
2219                  * might be modified at anytime. We have to retry at least
2220                  * once in case there WAS really a corruption of the page
2221                  * on the network, that was not caused by mmap() modifying
2222                  * the page. Bug11742 */
2223                 if ((rc == -EAGAIN) && (aa->aa_resends > 0) &&
2224                     aa->aa_oa->o_valid & OBD_MD_FLFLAGS &&
2225                     aa->aa_oa->o_flags & OBD_FL_MMAP) {
2226                         rc = 0;
2227                 } else {
2228                         rc = osc_brw_redo_request(req, aa);
2229                         if (rc == 0)
2230                                 RETURN(0);
2231                 }
2232         }
2233
2234         if (aa->aa_ocapa) {
2235                 capa_put(aa->aa_ocapa);
2236                 aa->aa_ocapa = NULL;
2237         }
2238
2239         cli = aa->aa_cli;
2240
2241         client_obd_list_lock(&cli->cl_loi_list_lock);
2242
2243         /* We need to decrement before osc_ap_completion->osc_wake_cache_waiters
2244          * is called so we know whether to go to sync BRWs or wait for more
2245          * RPCs to complete */
2246         if (lustre_msg_get_opc(req->rq_reqmsg) == OST_WRITE)
2247                 cli->cl_w_in_flight--;
2248         else
2249                 cli->cl_r_in_flight--;
2250
2251         async = cfs_list_empty(&aa->aa_oaps);
2252         if (!async) { /* from osc_send_oap_rpc() */
2253                 struct osc_async_page *oap, *tmp;
2254                 /* the caller may re-use the oap after the completion call so
2255                  * we need to clean it up a little */
2256                 cfs_list_for_each_entry_safe(oap, tmp, &aa->aa_oaps,
2257                                              oap_rpc_item) {
2258                         cfs_list_del_init(&oap->oap_rpc_item);
2259                         osc_ap_completion(env, cli, aa->aa_oa, oap, 1, rc);
2260                 }
2261                 OBDO_FREE(aa->aa_oa);
2262         } else { /* from async_internal() */
2263                 obd_count i;
2264                 for (i = 0; i < aa->aa_page_count; i++)
2265                         osc_release_write_grant(aa->aa_cli, aa->aa_ppga[i], 1);
2266         }
2267         osc_wake_cache_waiters(cli);
2268         osc_check_rpcs(env, cli);
2269         client_obd_list_unlock(&cli->cl_loi_list_lock);
2270         if (!async)
2271                 cl_req_completion(env, aa->aa_clerq, rc);
2272         osc_release_ppga(aa->aa_ppga, aa->aa_page_count);
2273
2274         RETURN(rc);
2275 }
2276
2277 static struct ptlrpc_request *osc_build_req(const struct lu_env *env,
2278                                             struct client_obd *cli,
2279                                             cfs_list_t *rpc_list,
2280                                             int page_count, int cmd)
2281 {
2282         struct ptlrpc_request *req;
2283         struct brw_page **pga = NULL;
2284         struct osc_brw_async_args *aa;
2285         struct obdo *oa = NULL;
2286         const struct obd_async_page_ops *ops = NULL;
2287         void *caller_data = NULL;
2288         struct osc_async_page *oap;
2289         struct osc_async_page *tmp;
2290         struct ost_body *body;
2291         struct cl_req *clerq = NULL;
2292         enum cl_req_type crt = (cmd & OBD_BRW_WRITE) ? CRT_WRITE : CRT_READ;
2293         struct ldlm_lock *lock = NULL;
2294         struct cl_req_attr crattr;
2295         int i, rc, mpflag = 0;
2296
2297         ENTRY;
2298         LASSERT(!cfs_list_empty(rpc_list));
2299
2300         if (cmd & OBD_BRW_MEMALLOC)
2301                 mpflag = cfs_memory_pressure_get_and_set();
2302
2303         memset(&crattr, 0, sizeof crattr);
2304         OBD_ALLOC(pga, sizeof(*pga) * page_count);
2305         if (pga == NULL)
2306                 GOTO(out, req = ERR_PTR(-ENOMEM));
2307
2308         OBDO_ALLOC(oa);
2309         if (oa == NULL)
2310                 GOTO(out, req = ERR_PTR(-ENOMEM));
2311
2312         i = 0;
2313         cfs_list_for_each_entry(oap, rpc_list, oap_rpc_item) {
2314                 struct cl_page *page = osc_oap2cl_page(oap);
2315                 if (ops == NULL) {
2316                         ops = oap->oap_caller_ops;
2317                         caller_data = oap->oap_caller_data;
2318
2319                         clerq = cl_req_alloc(env, page, crt,
2320                                              1 /* only 1-object rpcs for
2321                                                 * now */);
2322                         if (IS_ERR(clerq))
2323                                 GOTO(out, req = (void *)clerq);
2324                         lock = oap->oap_ldlm_lock;
2325                 }
2326                 pga[i] = &oap->oap_brw_page;
2327                 pga[i]->off = oap->oap_obj_off + oap->oap_page_off;
2328                 CDEBUG(0, "put page %p index %lu oap %p flg %x to pga\n",
2329                        pga[i]->pg, cfs_page_index(oap->oap_page), oap, pga[i]->flag);
2330                 i++;
2331                 cl_req_page_add(env, clerq, page);
2332         }
2333
2334         /* always get the data for the obdo for the rpc */
2335         LASSERT(ops != NULL);
2336         crattr.cra_oa = oa;
2337         crattr.cra_capa = NULL;
2338         cl_req_attr_set(env, clerq, &crattr, ~0ULL);
2339         if (lock) {
2340                 oa->o_handle = lock->l_remote_handle;
2341                 oa->o_valid |= OBD_MD_FLHANDLE;
2342         }
2343
2344         rc = cl_req_prep(env, clerq);
2345         if (rc != 0) {
2346                 CERROR("cl_req_prep failed: %d\n", rc);
2347                 GOTO(out, req = ERR_PTR(rc));
2348         }
2349
2350         sort_brw_pages(pga, page_count);
2351         rc = osc_brw_prep_request(cmd, cli, oa, NULL, page_count,
2352                                   pga, &req, crattr.cra_capa, 1, 0);
2353         if (rc != 0) {
2354                 CERROR("prep_req failed: %d\n", rc);
2355                 GOTO(out, req = ERR_PTR(rc));
2356         }
2357
2358         if (cmd & OBD_BRW_MEMALLOC)
2359                 req->rq_memalloc = 1;
2360
2361         /* Need to update the timestamps after the request is built in case
2362          * we race with setattr (locally or in queue at OST).  If OST gets
2363          * later setattr before earlier BRW (as determined by the request xid),
2364          * the OST will not use BRW timestamps.  Sadly, there is no obvious
2365          * way to do this in a single call.  bug 10150 */
2366         body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
2367         cl_req_attr_set(env, clerq, &crattr,
2368                         OBD_MD_FLMTIME|OBD_MD_FLCTIME|OBD_MD_FLATIME);
2369
2370         CLASSERT(sizeof(*aa) <= sizeof(req->rq_async_args));
2371         aa = ptlrpc_req_async_args(req);
2372         CFS_INIT_LIST_HEAD(&aa->aa_oaps);
2373         cfs_list_splice(rpc_list, &aa->aa_oaps);
2374         CFS_INIT_LIST_HEAD(rpc_list);
2375         aa->aa_clerq = clerq;
2376 out:
2377         if (cmd & OBD_BRW_MEMALLOC)
2378                 cfs_memory_pressure_restore(mpflag);
2379
2380         capa_put(crattr.cra_capa);
2381         if (IS_ERR(req)) {
2382                 if (oa)
2383                         OBDO_FREE(oa);
2384                 if (pga)
2385                         OBD_FREE(pga, sizeof(*pga) * page_count);
2386                 /* this should happen rarely and is pretty bad, it makes the
2387                  * pending list not follow the dirty order */
2388                 client_obd_list_lock(&cli->cl_loi_list_lock);
2389                 cfs_list_for_each_entry_safe(oap, tmp, rpc_list, oap_rpc_item) {
2390                         cfs_list_del_init(&oap->oap_rpc_item);
2391
2392                         /* queued sync pages can be torn down while the pages
2393                          * were between the pending list and the rpc */
2394                         if (oap->oap_interrupted) {
2395                                 CDEBUG(D_INODE, "oap %p interrupted\n", oap);
2396                                 osc_ap_completion(env, cli, NULL, oap, 0,
2397                                                   oap->oap_count);
2398                                 continue;
2399                         }
2400                         osc_ap_completion(env, cli, NULL, oap, 0, PTR_ERR(req));
2401                 }
2402                 if (clerq && !IS_ERR(clerq))
2403                         cl_req_completion(env, clerq, PTR_ERR(req));
2404         }
2405         RETURN(req);
2406 }
2407
2408 /**
2409  * prepare pages for ASYNC io and put pages in send queue.
2410  *
2411  * \param cmd OBD_BRW_* macroses
2412  * \param lop pending pages
2413  *
2414  * \return zero if no page added to send queue.
2415  * \return 1 if pages successfully added to send queue.
2416  * \return negative on errors.
2417  */
2418 static int
2419 osc_send_oap_rpc(const struct lu_env *env, struct client_obd *cli,
2420                  struct lov_oinfo *loi,
2421                  int cmd, struct loi_oap_pages *lop)
2422 {
2423         struct ptlrpc_request *req;
2424         obd_count page_count = 0;
2425         struct osc_async_page *oap = NULL, *tmp;
2426         struct osc_brw_async_args *aa;
2427         const struct obd_async_page_ops *ops;
2428         CFS_LIST_HEAD(rpc_list);
2429         CFS_LIST_HEAD(tmp_list);
2430         unsigned int ending_offset;
2431         unsigned  starting_offset = 0;
2432         int srvlock = 0, mem_tight = 0;
2433         struct cl_object *clob = NULL;
2434         ENTRY;
2435
2436         /* ASYNC_HP pages first. At present, when the lock the pages is
2437          * to be canceled, the pages covered by the lock will be sent out
2438          * with ASYNC_HP. We have to send out them as soon as possible. */
2439         cfs_list_for_each_entry_safe(oap, tmp, &lop->lop_urgent, oap_urgent_item) {
2440                 if (oap->oap_async_flags & ASYNC_HP) 
2441                         cfs_list_move(&oap->oap_pending_item, &tmp_list);
2442                 else
2443                         cfs_list_move_tail(&oap->oap_pending_item, &tmp_list);
2444                 if (++page_count >= cli->cl_max_pages_per_rpc)
2445                         break;
2446         }
2447
2448         cfs_list_splice(&tmp_list, &lop->lop_pending);
2449         page_count = 0;
2450
2451         /* first we find the pages we're allowed to work with */
2452         cfs_list_for_each_entry_safe(oap, tmp, &lop->lop_pending,
2453                                      oap_pending_item) {
2454                 ops = oap->oap_caller_ops;
2455
2456                 LASSERTF(oap->oap_magic == OAP_MAGIC, "Bad oap magic: oap %p, "
2457                          "magic 0x%x\n", oap, oap->oap_magic);
2458
2459                 if (clob == NULL) {
2460                         /* pin object in memory, so that completion call-backs
2461                          * can be safely called under client_obd_list lock. */
2462                         clob = osc_oap2cl_page(oap)->cp_obj;
2463                         cl_object_get(clob);
2464                 }
2465
2466                 if (page_count != 0 &&
2467                     srvlock != !!(oap->oap_brw_flags & OBD_BRW_SRVLOCK)) {
2468                         CDEBUG(D_PAGE, "SRVLOCK flag mismatch,"
2469                                " oap %p, page %p, srvlock %u\n",
2470                                oap, oap->oap_brw_page.pg, (unsigned)!srvlock);
2471                         break;
2472                 }
2473
2474                 /* If there is a gap at the start of this page, it can't merge
2475                  * with any previous page, so we'll hand the network a
2476                  * "fragmented" page array that it can't transfer in 1 RDMA */
2477                 if (page_count != 0 && oap->oap_page_off != 0)
2478                         break;
2479
2480                 /* in llite being 'ready' equates to the page being locked
2481                  * until completion unlocks it.  commit_write submits a page
2482                  * as not ready because its unlock will happen unconditionally
2483                  * as the call returns.  if we race with commit_write giving
2484                  * us that page we don't want to create a hole in the page
2485                  * stream, so we stop and leave the rpc to be fired by
2486                  * another dirtier or kupdated interval (the not ready page
2487                  * will still be on the dirty list).  we could call in
2488                  * at the end of ll_file_write to process the queue again. */
2489                 if (!(oap->oap_async_flags & ASYNC_READY)) {
2490                         int rc = ops->ap_make_ready(env, oap->oap_caller_data,
2491                                                     cmd);
2492                         if (rc < 0)
2493                                 CDEBUG(D_INODE, "oap %p page %p returned %d "
2494                                                 "instead of ready\n", oap,
2495                                                 oap->oap_page, rc);
2496                         switch (rc) {
2497                         case -EAGAIN:
2498                                 /* llite is telling us that the page is still
2499                                  * in commit_write and that we should try
2500                                  * and put it in an rpc again later.  we
2501                                  * break out of the loop so we don't create
2502                                  * a hole in the sequence of pages in the rpc
2503                                  * stream.*/
2504                                 oap = NULL;
2505                                 break;
2506                         case -EINTR:
2507                                 /* the io isn't needed.. tell the checks
2508                                  * below to complete the rpc with EINTR */
2509                                 cfs_spin_lock(&oap->oap_lock);
2510                                 oap->oap_async_flags |= ASYNC_COUNT_STABLE;
2511                                 cfs_spin_unlock(&oap->oap_lock);
2512                                 oap->oap_count = -EINTR;
2513                                 break;
2514                         case 0:
2515                                 cfs_spin_lock(&oap->oap_lock);
2516                                 oap->oap_async_flags |= ASYNC_READY;
2517                                 cfs_spin_unlock(&oap->oap_lock);
2518                                 break;
2519                         default:
2520                                 LASSERTF(0, "oap %p page %p returned %d "
2521                                             "from make_ready\n", oap,
2522                                             oap->oap_page, rc);
2523                                 break;
2524                         }
2525                 }
2526                 if (oap == NULL)
2527                         break;
2528                 /*
2529                  * Page submitted for IO has to be locked. Either by
2530                  * ->ap_make_ready() or by higher layers.
2531                  */
2532 #if defined(__KERNEL__) && defined(__linux__)
2533                 {
2534                         struct cl_page *page;
2535
2536                         page = osc_oap2cl_page(oap);
2537
2538                         if (page->cp_type == CPT_CACHEABLE &&
2539                             !(PageLocked(oap->oap_page) &&
2540                               (CheckWriteback(oap->oap_page, cmd)))) {
2541                                 CDEBUG(D_PAGE, "page %p lost wb %lx/%x\n",
2542                                        oap->oap_page,
2543                                        (long)oap->oap_page->flags,
2544                                        oap->oap_async_flags);
2545                                 LBUG();
2546                         }
2547                 }
2548 #endif
2549
2550                 /* take the page out of our book-keeping */
2551                 cfs_list_del_init(&oap->oap_pending_item);
2552                 lop_update_pending(cli, lop, cmd, -1);
2553                 cfs_list_del_init(&oap->oap_urgent_item);
2554
2555                 if (page_count == 0)
2556                         starting_offset = (oap->oap_obj_off+oap->oap_page_off) &
2557                                           (PTLRPC_MAX_BRW_SIZE - 1);
2558
2559                 /* ask the caller for the size of the io as the rpc leaves. */
2560                 if (!(oap->oap_async_flags & ASYNC_COUNT_STABLE)) {
2561                         oap->oap_count =
2562                                 ops->ap_refresh_count(env, oap->oap_caller_data,
2563                                                       cmd);
2564                         LASSERT(oap->oap_page_off + oap->oap_count <= CFS_PAGE_SIZE);
2565                 }
2566                 if (oap->oap_count <= 0) {
2567                         CDEBUG(D_CACHE, "oap %p count %d, completing\n", oap,
2568                                oap->oap_count);
2569                         osc_ap_completion(env, cli, NULL,
2570                                           oap, 0, oap->oap_count);
2571                         continue;
2572                 }
2573
2574                 /* now put the page back in our accounting */
2575                 cfs_list_add_tail(&oap->oap_rpc_item, &rpc_list);
2576                 if (oap->oap_brw_flags & OBD_BRW_MEMALLOC)
2577                         mem_tight = 1;
2578                 if (page_count == 0)
2579                         srvlock = !!(oap->oap_brw_flags & OBD_BRW_SRVLOCK);
2580                 if (++page_count >= cli->cl_max_pages_per_rpc)
2581                         break;
2582
2583                 /* End on a PTLRPC_MAX_BRW_SIZE boundary.  We want full-sized
2584                  * RPCs aligned on PTLRPC_MAX_BRW_SIZE boundaries to help reads
2585                  * have the same alignment as the initial writes that allocated
2586                  * extents on the server. */
2587                 ending_offset = (oap->oap_obj_off + oap->oap_page_off +
2588                                  oap->oap_count) & (PTLRPC_MAX_BRW_SIZE - 1);
2589                 if (ending_offset == 0)
2590                         break;
2591
2592                 /* If there is a gap at the end of this page, it can't merge
2593                  * with any subsequent pages, so we'll hand the network a
2594                  * "fragmented" page array that it can't transfer in 1 RDMA */
2595                 if (oap->oap_page_off + oap->oap_count < CFS_PAGE_SIZE)
2596                         break;
2597         }
2598
2599         osc_wake_cache_waiters(cli);
2600
2601         loi_list_maint(cli, loi);
2602
2603         client_obd_list_unlock(&cli->cl_loi_list_lock);
2604
2605         if (clob != NULL)
2606                 cl_object_put(env, clob);
2607
2608         if (page_count == 0) {
2609                 client_obd_list_lock(&cli->cl_loi_list_lock);
2610                 RETURN(0);
2611         }
2612
2613         req = osc_build_req(env, cli, &rpc_list, page_count,
2614                             mem_tight ? (cmd | OBD_BRW_MEMALLOC) : cmd);
2615         if (IS_ERR(req)) {
2616                 LASSERT(cfs_list_empty(&rpc_list));
2617                 loi_list_maint(cli, loi);
2618                 RETURN(PTR_ERR(req));
2619         }
2620
2621         aa = ptlrpc_req_async_args(req);
2622
2623         if (cmd == OBD_BRW_READ) {
2624                 lprocfs_oh_tally_log2(&cli->cl_read_page_hist, page_count);
2625                 lprocfs_oh_tally(&cli->cl_read_rpc_hist, cli->cl_r_in_flight);
2626                 lprocfs_oh_tally_log2(&cli->cl_read_offset_hist,
2627                                       (starting_offset >> CFS_PAGE_SHIFT) + 1);
2628         } else {
2629                 lprocfs_oh_tally_log2(&cli->cl_write_page_hist, page_count);
2630                 lprocfs_oh_tally(&cli->cl_write_rpc_hist,
2631                                  cli->cl_w_in_flight);
2632                 lprocfs_oh_tally_log2(&cli->cl_write_offset_hist,
2633                                       (starting_offset >> CFS_PAGE_SHIFT) + 1);
2634         }
2635         ptlrpc_lprocfs_brw(req, aa->aa_requested_nob);
2636
2637         client_obd_list_lock(&cli->cl_loi_list_lock);
2638
2639         if (cmd == OBD_BRW_READ)
2640                 cli->cl_r_in_flight++;
2641         else
2642                 cli->cl_w_in_flight++;
2643
2644         /* queued sync pages can be torn down while the pages
2645          * were between the pending list and the rpc */
2646         tmp = NULL;
2647         cfs_list_for_each_entry(oap, &aa->aa_oaps, oap_rpc_item) {
2648                 /* only one oap gets a request reference */
2649                 if (tmp == NULL)
2650                         tmp = oap;
2651                 if (oap->oap_interrupted && !req->rq_intr) {
2652                         CDEBUG(D_INODE, "oap %p in req %p interrupted\n",
2653                                oap, req);
2654                         ptlrpc_mark_interrupted(req);
2655                 }
2656         }
2657         if (tmp != NULL)
2658                 tmp->oap_request = ptlrpc_request_addref(req);
2659
2660         DEBUG_REQ(D_INODE, req, "%d pages, aa %p. now %dr/%dw in flight",
2661                   page_count, aa, cli->cl_r_in_flight, cli->cl_w_in_flight);
2662
2663         req->rq_interpret_reply = brw_interpret;
2664         ptlrpcd_add_req(req, PSCOPE_BRW);
2665         RETURN(1);
2666 }
2667
2668 #define LOI_DEBUG(LOI, STR, args...)                                     \
2669         CDEBUG(D_INODE, "loi ready %d wr %d:%d rd %d:%d " STR,           \
2670                !cfs_list_empty(&(LOI)->loi_ready_item) ||                \
2671                !cfs_list_empty(&(LOI)->loi_hp_ready_item),               \
2672                (LOI)->loi_write_lop.lop_num_pending,                     \
2673                !cfs_list_empty(&(LOI)->loi_write_lop.lop_urgent),        \
2674                (LOI)->loi_read_lop.lop_num_pending,                      \
2675                !cfs_list_empty(&(LOI)->loi_read_lop.lop_urgent),         \
2676                args)                                                     \
2677
2678 /* This is called by osc_check_rpcs() to find which objects have pages that
2679  * we could be sending.  These lists are maintained by lop_makes_rpc(). */
2680 struct lov_oinfo *osc_next_loi(struct client_obd *cli)
2681 {
2682         ENTRY;
2683
2684         /* First return objects that have blocked locks so that they
2685          * will be flushed quickly and other clients can get the lock,
2686          * then objects which have pages ready to be stuffed into RPCs */
2687         if (!cfs_list_empty(&cli->cl_loi_hp_ready_list))
2688                 RETURN(cfs_list_entry(cli->cl_loi_hp_ready_list.next,
2689                                       struct lov_oinfo, loi_hp_ready_item));
2690         if (!cfs_list_empty(&cli->cl_loi_ready_list))
2691                 RETURN(cfs_list_entry(cli->cl_loi_ready_list.next,
2692                                       struct lov_oinfo, loi_ready_item));
2693
2694         /* then if we have cache waiters, return all objects with queued
2695          * writes.  This is especially important when many small files
2696          * have filled up the cache and not been fired into rpcs because
2697          * they don't pass the nr_pending/object threshhold */
2698         if (!cfs_list_empty(&cli->cl_cache_waiters) &&
2699             !cfs_list_empty(&cli->cl_loi_write_list))
2700                 RETURN(cfs_list_entry(cli->cl_loi_write_list.next,
2701                                       struct lov_oinfo, loi_write_item));
2702
2703         /* then return all queued objects when we have an invalid import
2704          * so that they get flushed */
2705         if (cli->cl_import == NULL || cli->cl_import->imp_invalid) {
2706                 if (!cfs_list_empty(&cli->cl_loi_write_list))
2707                         RETURN(cfs_list_entry(cli->cl_loi_write_list.next,
2708                                               struct lov_oinfo,
2709                                               loi_write_item));
2710                 if (!cfs_list_empty(&cli->cl_loi_read_list))
2711                         RETURN(cfs_list_entry(cli->cl_loi_read_list.next,
2712                                               struct lov_oinfo, loi_read_item));
2713         }
2714         RETURN(NULL);
2715 }
2716
2717 static int osc_max_rpc_in_flight(struct client_obd *cli, struct lov_oinfo *loi)
2718 {
2719         struct osc_async_page *oap;
2720         int hprpc = 0;
2721
2722         if (!cfs_list_empty(&loi->loi_write_lop.lop_urgent)) {
2723                 oap = cfs_list_entry(loi->loi_write_lop.lop_urgent.next,
2724                                      struct osc_async_page, oap_urgent_item);
2725                 hprpc = !!(oap->oap_async_flags & ASYNC_HP);
2726         }
2727
2728         if (!hprpc && !cfs_list_empty(&loi->loi_read_lop.lop_urgent)) {
2729                 oap = cfs_list_entry(loi->loi_read_lop.lop_urgent.next,
2730                                      struct osc_async_page, oap_urgent_item);
2731                 hprpc = !!(oap->oap_async_flags & ASYNC_HP);
2732         }
2733
2734         return rpcs_in_flight(cli) >= cli->cl_max_rpcs_in_flight + hprpc;
2735 }
2736
2737 /* called with the loi list lock held */
2738 void osc_check_rpcs(const struct lu_env *env, struct client_obd *cli)
2739 {
2740         struct lov_oinfo *loi;
2741         int rc = 0, race_counter = 0;
2742         ENTRY;
2743
2744         while ((loi = osc_next_loi(cli)) != NULL) {
2745                 LOI_DEBUG(loi, "%lu in flight\n", rpcs_in_flight(cli));
2746
2747                 if (osc_max_rpc_in_flight(cli, loi))
2748                         break;
2749
2750                 /* attempt some read/write balancing by alternating between
2751                  * reads and writes in an object.  The makes_rpc checks here
2752                  * would be redundant if we were getting read/write work items
2753                  * instead of objects.  we don't want send_oap_rpc to drain a
2754                  * partial read pending queue when we're given this object to
2755                  * do io on writes while there are cache waiters */
2756                 if (lop_makes_rpc(cli, &loi->loi_write_lop, OBD_BRW_WRITE)) {
2757                         rc = osc_send_oap_rpc(env, cli, loi, OBD_BRW_WRITE,
2758                                               &loi->loi_write_lop);
2759                         if (rc < 0) {
2760                                 CERROR("Write request failed with %d\n", rc);
2761
2762                                 /* osc_send_oap_rpc failed, mostly because of
2763                                  * memory pressure.
2764                                  *
2765                                  * It can't break here, because if:
2766                                  *  - a page was submitted by osc_io_submit, so
2767                                  *    page locked;
2768                                  *  - no request in flight
2769                                  *  - no subsequent request
2770                                  * The system will be in live-lock state,
2771                                  * because there is no chance to call
2772                                  * osc_io_unplug() and osc_check_rpcs() any
2773                                  * more. pdflush can't help in this case,
2774                                  * because it might be blocked at grabbing
2775                                  * the page lock as we mentioned.
2776                                  *
2777                                  * Anyway, continue to drain pages. */
2778                                 /* break; */
2779                         }
2780
2781                         if (rc > 0)
2782                                 race_counter = 0;
2783                         else
2784                                 race_counter++;
2785                 }
2786                 if (lop_makes_rpc(cli, &loi->loi_read_lop, OBD_BRW_READ)) {
2787                         rc = osc_send_oap_rpc(env, cli, loi, OBD_BRW_READ,
2788                                               &loi->loi_read_lop);
2789                         if (rc < 0)
2790                                 CERROR("Read request failed with %d\n", rc);
2791
2792                         if (rc > 0)
2793                                 race_counter = 0;
2794                         else
2795                                 race_counter++;
2796                 }
2797
2798                 /* attempt some inter-object balancing by issuing rpcs
2799                  * for each object in turn */
2800                 if (!cfs_list_empty(&loi->loi_hp_ready_item))
2801                         cfs_list_del_init(&loi->loi_hp_ready_item);
2802                 if (!cfs_list_empty(&loi->loi_ready_item))
2803                         cfs_list_del_init(&loi->loi_ready_item);
2804                 if (!cfs_list_empty(&loi->loi_write_item))
2805                         cfs_list_del_init(&loi->loi_write_item);
2806                 if (!cfs_list_empty(&loi->loi_read_item))
2807                         cfs_list_del_init(&loi->loi_read_item);
2808
2809                 loi_list_maint(cli, loi);
2810
2811                 /* send_oap_rpc fails with 0 when make_ready tells it to
2812                  * back off.  llite's make_ready does this when it tries
2813                  * to lock a page queued for write that is already locked.
2814                  * we want to try sending rpcs from many objects, but we
2815                  * don't want to spin failing with 0.  */
2816                 if (race_counter == 10)
2817                         break;
2818         }
2819         EXIT;
2820 }
2821
2822 /* we're trying to queue a page in the osc so we're subject to the
2823  * 'cl_dirty_max' limit on the number of pages that can be queued in the osc.
2824  * If the osc's queued pages are already at that limit, then we want to sleep
2825  * until there is space in the osc's queue for us.  We also may be waiting for
2826  * write credits from the OST if there are RPCs in flight that may return some
2827  * before we fall back to sync writes.
2828  *
2829  * We need this know our allocation was granted in the presence of signals */
2830 static int ocw_granted(struct client_obd *cli, struct osc_cache_waiter *ocw)
2831 {
2832         int rc;
2833         ENTRY;
2834         client_obd_list_lock(&cli->cl_loi_list_lock);
2835         rc = cfs_list_empty(&ocw->ocw_entry) || rpcs_in_flight(cli) == 0;
2836         client_obd_list_unlock(&cli->cl_loi_list_lock);
2837         RETURN(rc);
2838 };
2839
2840 /**
2841  * Non-blocking version of osc_enter_cache() that consumes grant only when it
2842  * is available.
2843  */
2844 int osc_enter_cache_try(const struct lu_env *env,
2845                         struct client_obd *cli, struct lov_oinfo *loi,
2846                         struct osc_async_page *oap, int transient)
2847 {
2848         int has_grant;
2849
2850         has_grant = cli->cl_avail_grant >= CFS_PAGE_SIZE;
2851         if (has_grant) {
2852                 osc_consume_write_grant(cli, &oap->oap_brw_page);
2853                 if (transient) {
2854                         cli->cl_dirty_transit += CFS_PAGE_SIZE;
2855                         cfs_atomic_inc(&obd_dirty_transit_pages);
2856                         oap->oap_brw_flags |= OBD_BRW_NOCACHE;
2857                 }
2858         }
2859         return has_grant;
2860 }
2861
2862 /* Caller must hold loi_list_lock - we drop/regain it if we need to wait for
2863  * grant or cache space. */
2864 static int osc_enter_cache(const struct lu_env *env,
2865                            struct client_obd *cli, struct lov_oinfo *loi,
2866                            struct osc_async_page *oap)
2867 {
2868         struct osc_cache_waiter ocw;
2869         struct l_wait_info lwi = LWI_INTR(LWI_ON_SIGNAL_NOOP, NULL);
2870
2871         ENTRY;
2872
2873         CDEBUG(D_CACHE, "dirty: %ld/%d dirty_max: %ld/%d dropped: %lu "
2874                "grant: %lu\n", cli->cl_dirty, cfs_atomic_read(&obd_dirty_pages),
2875                cli->cl_dirty_max, obd_max_dirty_pages,
2876                cli->cl_lost_grant, cli->cl_avail_grant);
2877
2878         /* force the caller to try sync io.  this can jump the list
2879          * of queued writes and create a discontiguous rpc stream */
2880         if (cli->cl_dirty_max < CFS_PAGE_SIZE || cli->cl_ar.ar_force_sync ||
2881             loi->loi_ar.ar_force_sync)
2882                 RETURN(-EDQUOT);
2883
2884         /* Hopefully normal case - cache space and write credits available */
2885         if (cli->cl_dirty + CFS_PAGE_SIZE <= cli->cl_dirty_max &&
2886             cfs_atomic_read(&obd_dirty_pages) + 1 <= obd_max_dirty_pages &&
2887             osc_enter_cache_try(env, cli, loi, oap, 0))
2888                 RETURN(0);
2889
2890         /* It is safe to block as a cache waiter as long as there is grant
2891          * space available or the hope of additional grant being returned
2892          * when an in flight write completes.  Using the write back cache
2893          * if possible is preferable to sending the data synchronously
2894          * because write pages can then be merged in to large requests.
2895          * The addition of this cache waiter will causing pending write
2896          * pages to be sent immediately. */
2897         if (cli->cl_w_in_flight || cli->cl_avail_grant >= CFS_PAGE_SIZE) {
2898                 cfs_list_add_tail(&ocw.ocw_entry, &cli->cl_cache_waiters);
2899                 cfs_waitq_init(&ocw.ocw_waitq);
2900                 ocw.ocw_oap = oap;
2901                 ocw.ocw_rc = 0;
2902
2903                 loi_list_maint(cli, loi);
2904                 osc_check_rpcs(env, cli);
2905                 client_obd_list_unlock(&cli->cl_loi_list_lock);
2906
2907                 CDEBUG(D_CACHE, "sleeping for cache space\n");
2908                 l_wait_event(ocw.ocw_waitq, ocw_granted(cli, &ocw), &lwi);
2909
2910                 client_obd_list_lock(&cli->cl_loi_list_lock);
2911                 if (!cfs_list_empty(&ocw.ocw_entry)) {
2912                         cfs_list_del(&ocw.ocw_entry);
2913                         RETURN(-EINTR);
2914                 }
2915                 RETURN(ocw.ocw_rc);
2916         }
2917
2918         RETURN(-EDQUOT);
2919 }
2920
2921
2922 int osc_prep_async_page(struct obd_export *exp, struct lov_stripe_md *lsm,
2923                         struct lov_oinfo *loi, cfs_page_t *page,
2924                         obd_off offset, const struct obd_async_page_ops *ops,
2925                         void *data, void **res, int nocache,
2926                         struct lustre_handle *lockh)
2927 {
2928         struct osc_async_page *oap;
2929
2930         ENTRY;
2931
2932         if (!page)
2933                 return cfs_size_round(sizeof(*oap));
2934
2935         oap = *res;
2936         oap->oap_magic = OAP_MAGIC;
2937         oap->oap_cli = &exp->exp_obd->u.cli;
2938         oap->oap_loi = loi;
2939
2940         oap->oap_caller_ops = ops;
2941         oap->oap_caller_data = data;
2942
2943         oap->oap_page = page;
2944         oap->oap_obj_off = offset;
2945         if (!client_is_remote(exp) &&
2946             cfs_capable(CFS_CAP_SYS_RESOURCE))
2947                 oap->oap_brw_flags = OBD_BRW_NOQUOTA;
2948
2949         LASSERT(!(offset & ~CFS_PAGE_MASK));
2950
2951         CFS_INIT_LIST_HEAD(&oap->oap_pending_item);
2952         CFS_INIT_LIST_HEAD(&oap->oap_urgent_item);
2953         CFS_INIT_LIST_HEAD(&oap->oap_rpc_item);
2954         CFS_INIT_LIST_HEAD(&oap->oap_page_list);
2955
2956         cfs_spin_lock_init(&oap->oap_lock);
2957         CDEBUG(D_CACHE, "oap %p page %p obj off "LPU64"\n", oap, page, offset);
2958         RETURN(0);
2959 }
2960
2961 struct osc_async_page *oap_from_cookie(void *cookie)
2962 {
2963         struct osc_async_page *oap = cookie;
2964         if (oap->oap_magic != OAP_MAGIC)
2965                 return ERR_PTR(-EINVAL);
2966         return oap;
2967 };
2968
2969 int osc_queue_async_io(const struct lu_env *env,
2970                        struct obd_export *exp, struct lov_stripe_md *lsm,
2971                        struct lov_oinfo *loi, void *cookie,
2972                        int cmd, obd_off off, int count,
2973                        obd_flag brw_flags, enum async_flags async_flags)
2974 {
2975         struct client_obd *cli = &exp->exp_obd->u.cli;
2976         struct osc_async_page *oap;
2977         int rc = 0;
2978         ENTRY;
2979
2980         oap = oap_from_cookie(cookie);
2981         if (IS_ERR(oap))
2982                 RETURN(PTR_ERR(oap));
2983
2984         if (cli->cl_import == NULL || cli->cl_import->imp_invalid)
2985                 RETURN(-EIO);
2986
2987         if (!cfs_list_empty(&oap->oap_pending_item) ||
2988             !cfs_list_empty(&oap->oap_urgent_item) ||
2989             !cfs_list_empty(&oap->oap_rpc_item))
2990                 RETURN(-EBUSY);
2991
2992         /* check if the file's owner/group is over quota */
2993         if ((cmd & OBD_BRW_WRITE) && !(cmd & OBD_BRW_NOQUOTA)) {
2994                 struct cl_object *obj;
2995                 struct cl_attr    attr; /* XXX put attr into thread info */
2996                 unsigned int qid[MAXQUOTAS];
2997
2998                 obj = cl_object_top(osc_oap2cl_page(oap)->cp_obj);
2999
3000                 cl_object_attr_lock(obj);
3001                 rc = cl_object_attr_get(env, obj, &attr);
3002                 cl_object_attr_unlock(obj);
3003
3004                 qid[USRQUOTA] = attr.cat_uid;
3005                 qid[GRPQUOTA] = attr.cat_gid;
3006                 if (rc == 0 &&
3007                     lquota_chkdq(quota_interface, cli, qid) == NO_QUOTA)
3008                         rc = -EDQUOT;
3009                 if (rc)
3010                         RETURN(rc);
3011         }
3012
3013         if (loi == NULL)
3014                 loi = lsm->lsm_oinfo[0];
3015
3016         client_obd_list_lock(&cli->cl_loi_list_lock);
3017
3018         LASSERT(off + count <= CFS_PAGE_SIZE);
3019         oap->oap_cmd = cmd;
3020         oap->oap_page_off = off;
3021         oap->oap_count = count;
3022         oap->oap_brw_flags = brw_flags;
3023         /* Give a hint to OST that requests are coming from kswapd - bug19529 */
3024         if (cfs_memory_pressure_get())
3025                 oap->oap_brw_flags |= OBD_BRW_MEMALLOC;
3026         cfs_spin_lock(&oap->oap_lock);
3027         oap->oap_async_flags = async_flags;
3028         cfs_spin_unlock(&oap->oap_lock);
3029
3030         if (cmd & OBD_BRW_WRITE) {
3031                 rc = osc_enter_cache(env, cli, loi, oap);
3032                 if (rc) {
3033                         client_obd_list_unlock(&cli->cl_loi_list_lock);
3034                         RETURN(rc);
3035                 }
3036         }
3037
3038         osc_oap_to_pending(oap);
3039         loi_list_maint(cli, loi);
3040
3041         LOI_DEBUG(loi, "oap %p page %p added for cmd %d\n", oap, oap->oap_page,
3042                   cmd);
3043
3044         osc_check_rpcs(env, cli);
3045         client_obd_list_unlock(&cli->cl_loi_list_lock);
3046
3047         RETURN(0);
3048 }
3049
3050 /* aka (~was & now & flag), but this is more clear :) */
3051 #define SETTING(was, now, flag) (!(was & flag) && (now & flag))
3052
3053 int osc_set_async_flags_base(struct client_obd *cli,
3054                              struct lov_oinfo *loi, struct osc_async_page *oap,
3055                              obd_flag async_flags)
3056 {
3057         struct loi_oap_pages *lop;
3058         int flags = 0;
3059         ENTRY;
3060
3061         LASSERT(!cfs_list_empty(&oap->oap_pending_item));
3062
3063         if (oap->oap_cmd & OBD_BRW_WRITE) {
3064                 lop = &loi->loi_write_lop;
3065         } else {
3066                 lop = &loi->loi_read_lop;
3067         }
3068
3069         if ((oap->oap_async_flags & async_flags) == async_flags)
3070                 RETURN(0);
3071
3072         if (SETTING(oap->oap_async_flags, async_flags, ASYNC_READY))
3073                 flags |= ASYNC_READY;
3074
3075         if (SETTING(oap->oap_async_flags, async_flags, ASYNC_URGENT) &&
3076             cfs_list_empty(&oap->oap_rpc_item)) {
3077                 if (oap->oap_async_flags & ASYNC_HP)
3078                         cfs_list_add(&oap->oap_urgent_item, &lop->lop_urgent);
3079                 else
3080                         cfs_list_add_tail(&oap->oap_urgent_item,
3081                                           &lop->lop_urgent);
3082                 flags |= ASYNC_URGENT;
3083                 loi_list_maint(cli, loi);
3084         }
3085         cfs_spin_lock(&oap->oap_lock);
3086         oap->oap_async_flags |= flags;
3087         cfs_spin_unlock(&oap->oap_lock);
3088
3089         LOI_DEBUG(loi, "oap %p page %p has flags %x\n", oap, oap->oap_page,
3090                         oap->oap_async_flags);
3091         RETURN(0);
3092 }
3093
3094 int osc_teardown_async_page(struct obd_export *exp,
3095                             struct lov_stripe_md *lsm,
3096                             struct lov_oinfo *loi, void *cookie)
3097 {
3098         struct client_obd *cli = &exp->exp_obd->u.cli;
3099         struct loi_oap_pages *lop;
3100         struct osc_async_page *oap;
3101         int rc = 0;
3102         ENTRY;
3103
3104         oap = oap_from_cookie(cookie);
3105         if (IS_ERR(oap))
3106                 RETURN(PTR_ERR(oap));
3107
3108         if (loi == NULL)
3109                 loi = lsm->lsm_oinfo[0];
3110
3111         if (oap->oap_cmd & OBD_BRW_WRITE) {
3112                 lop = &loi->loi_write_lop;
3113         } else {
3114                 lop = &loi->loi_read_lop;
3115         }
3116
3117         client_obd_list_lock(&cli->cl_loi_list_lock);
3118
3119         if (!cfs_list_empty(&oap->oap_rpc_item))
3120                 GOTO(out, rc = -EBUSY);
3121
3122         osc_exit_cache(cli, oap, 0);
3123         osc_wake_cache_waiters(cli);
3124
3125         if (!cfs_list_empty(&oap->oap_urgent_item)) {
3126                 cfs_list_del_init(&oap->oap_urgent_item);
3127                 cfs_spin_lock(&oap->oap_lock);
3128                 oap->oap_async_flags &= ~(ASYNC_URGENT | ASYNC_HP);
3129                 cfs_spin_unlock(&oap->oap_lock);
3130         }
3131         if (!cfs_list_empty(&oap->oap_pending_item)) {
3132                 cfs_list_del_init(&oap->oap_pending_item);
3133                 lop_update_pending(cli, lop, oap->oap_cmd, -1);
3134         }
3135         loi_list_maint(cli, loi);
3136         LOI_DEBUG(loi, "oap %p page %p torn down\n", oap, oap->oap_page);
3137 out:
3138         client_obd_list_unlock(&cli->cl_loi_list_lock);
3139         RETURN(rc);
3140 }
3141
3142 static void osc_set_lock_data_with_check(struct ldlm_lock *lock,
3143                                          struct ldlm_enqueue_info *einfo,
3144                                          int flags)
3145 {
3146         void *data = einfo->ei_cbdata;
3147
3148         LASSERT(lock != NULL);
3149         LASSERT(lock->l_blocking_ast == einfo->ei_cb_bl);
3150         LASSERT(lock->l_resource->lr_type == einfo->ei_type);
3151         LASSERT(lock->l_completion_ast == einfo->ei_cb_cp);
3152         LASSERT(lock->l_glimpse_ast == einfo->ei_cb_gl);
3153
3154         lock_res_and_lock(lock);
3155         cfs_spin_lock(&osc_ast_guard);
3156         LASSERT(lock->l_ast_data == NULL || lock->l_ast_data == data);
3157         lock->l_ast_data = data;
3158         cfs_spin_unlock(&osc_ast_guard);
3159         unlock_res_and_lock(lock);
3160 }
3161
3162 static void osc_set_data_with_check(struct lustre_handle *lockh,
3163                                     struct ldlm_enqueue_info *einfo,
3164                                     int flags)
3165 {
3166         struct ldlm_lock *lock = ldlm_handle2lock(lockh);
3167
3168         if (lock != NULL) {
3169                 osc_set_lock_data_with_check(lock, einfo, flags);
3170                 LDLM_LOCK_PUT(lock);
3171         } else
3172                 CERROR("lockh %p, data %p - client evicted?\n",
3173                        lockh, einfo->ei_cbdata);
3174 }
3175
3176 static int osc_change_cbdata(struct obd_export *exp, struct lov_stripe_md *lsm,
3177                              ldlm_iterator_t replace, void *data)
3178 {
3179         struct ldlm_res_id res_id;
3180         struct obd_device *obd = class_exp2obd(exp);
3181
3182         osc_build_res_name(lsm->lsm_object_id, lsm->lsm_object_seq, &res_id);
3183         ldlm_resource_iterate(obd->obd_namespace, &res_id, replace, data);
3184         return 0;
3185 }
3186
3187 /* find any ldlm lock of the inode in osc
3188  * return 0    not find
3189  *        1    find one
3190  *      < 0    error */
3191 static int osc_find_cbdata(struct obd_export *exp, struct lov_stripe_md *lsm,
3192                            ldlm_iterator_t replace, void *data)
3193 {
3194         struct ldlm_res_id res_id;
3195         struct obd_device *obd = class_exp2obd(exp);
3196         int rc = 0;
3197
3198         osc_build_res_name(lsm->lsm_object_id, lsm->lsm_object_seq, &res_id);
3199         rc = ldlm_resource_iterate(obd->obd_namespace, &res_id, replace, data);
3200         if (rc == LDLM_ITER_STOP)
3201                 return(1);
3202         if (rc == LDLM_ITER_CONTINUE)
3203                 return(0);
3204         return(rc);
3205 }
3206
3207 static int osc_enqueue_fini(struct ptlrpc_request *req, struct ost_lvb *lvb,
3208                             obd_enqueue_update_f upcall, void *cookie,
3209                             int *flags, int rc)
3210 {
3211         int intent = *flags & LDLM_FL_HAS_INTENT;
3212         ENTRY;
3213
3214         if (intent) {
3215                 /* The request was created before ldlm_cli_enqueue call. */
3216                 if (rc == ELDLM_LOCK_ABORTED) {
3217                         struct ldlm_reply *rep;
3218                         rep = req_capsule_server_get(&req->rq_pill,
3219                                                      &RMF_DLM_REP);
3220
3221                         LASSERT(rep != NULL);
3222                         if (rep->lock_policy_res1)
3223                                 rc = rep->lock_policy_res1;
3224                 }
3225         }
3226
3227         if ((intent && rc == ELDLM_LOCK_ABORTED) || !rc) {
3228                 *flags |= LDLM_FL_LVB_READY;
3229                 CDEBUG(D_INODE,"got kms "LPU64" blocks "LPU64" mtime "LPU64"\n",
3230                        lvb->lvb_size, lvb->lvb_blocks, lvb->lvb_mtime);
3231         }
3232
3233         /* Call the update callback. */
3234         rc = (*upcall)(cookie, rc);
3235         RETURN(rc);
3236 }
3237
3238 static int osc_enqueue_interpret(const struct lu_env *env,
3239                                  struct ptlrpc_request *req,
3240                                  struct osc_enqueue_args *aa, int rc)
3241 {
3242         struct ldlm_lock *lock;
3243         struct lustre_handle handle;
3244         __u32 mode;
3245
3246         /* Make a local copy of a lock handle and a mode, because aa->oa_*
3247          * might be freed anytime after lock upcall has been called. */
3248         lustre_handle_copy(&handle, aa->oa_lockh);
3249         mode = aa->oa_ei->ei_mode;
3250
3251         /* ldlm_cli_enqueue is holding a reference on the lock, so it must
3252          * be valid. */
3253         lock = ldlm_handle2lock(&handle);
3254
3255         /* Take an additional reference so that a blocking AST that
3256          * ldlm_cli_enqueue_fini() might post for a failed lock, is guaranteed
3257          * to arrive after an upcall has been executed by
3258          * osc_enqueue_fini(). */
3259         ldlm_lock_addref(&handle, mode);
3260
3261         /* Let CP AST to grant the lock first. */
3262         OBD_FAIL_TIMEOUT(OBD_FAIL_OSC_CP_ENQ_RACE, 1);
3263
3264         /* Complete obtaining the lock procedure. */
3265         rc = ldlm_cli_enqueue_fini(aa->oa_exp, req, aa->oa_ei->ei_type, 1,
3266                                    mode, aa->oa_flags, aa->oa_lvb,
3267                                    sizeof(*aa->oa_lvb), &handle, rc);
3268         /* Complete osc stuff. */
3269         rc = osc_enqueue_fini(req, aa->oa_lvb,
3270                               aa->oa_upcall, aa->oa_cookie, aa->oa_flags, rc);
3271
3272         OBD_FAIL_TIMEOUT(OBD_FAIL_OSC_CP_CANCEL_RACE, 10);
3273
3274         /* Release the lock for async request. */
3275         if (lustre_handle_is_used(&handle) && rc == ELDLM_OK)
3276                 /*
3277                  * Releases a reference taken by ldlm_cli_enqueue(), if it is
3278                  * not already released by
3279                  * ldlm_cli_enqueue_fini()->failed_lock_cleanup()
3280                  */
3281                 ldlm_lock_decref(&handle, mode);
3282
3283         LASSERTF(lock != NULL, "lockh %p, req %p, aa %p - client evicted?\n",
3284                  aa->oa_lockh, req, aa);
3285         ldlm_lock_decref(&handle, mode);
3286         LDLM_LOCK_PUT(lock);
3287         return rc;
3288 }
3289
3290 void osc_update_enqueue(struct lustre_handle *lov_lockhp,
3291                         struct lov_oinfo *loi, int flags,
3292                         struct ost_lvb *lvb, __u32 mode, int rc)
3293 {
3294         if (rc == ELDLM_OK) {
3295                 struct ldlm_lock *lock = ldlm_handle2lock(lov_lockhp);
3296                 __u64 tmp;
3297
3298                 LASSERT(lock != NULL);
3299                 loi->loi_lvb = *lvb;
3300                 tmp = loi->loi_lvb.lvb_size;
3301                 /* Extend KMS up to the end of this lock and no further
3302                  * A lock on [x,y] means a KMS of up to y + 1 bytes! */
3303                 if (tmp > lock->l_policy_data.l_extent.end)
3304                         tmp = lock->l_policy_data.l_extent.end + 1;
3305                 if (tmp >= loi->loi_kms) {
3306                         LDLM_DEBUG(lock, "lock acquired, setting rss="LPU64
3307                                    ", kms="LPU64, loi->loi_lvb.lvb_size, tmp);
3308                         loi_kms_set(loi, tmp);
3309                 } else {
3310                         LDLM_DEBUG(lock, "lock acquired, setting rss="
3311                                    LPU64"; leaving kms="LPU64", end="LPU64,
3312                                    loi->loi_lvb.lvb_size, loi->loi_kms,
3313                                    lock->l_policy_data.l_extent.end);
3314                 }
3315                 ldlm_lock_allow_match(lock);
3316                 LDLM_LOCK_PUT(lock);
3317         } else if (rc == ELDLM_LOCK_ABORTED && (flags & LDLM_FL_HAS_INTENT)) {
3318                 loi->loi_lvb = *lvb;
3319                 CDEBUG(D_INODE, "glimpsed, setting rss="LPU64"; leaving"
3320                        " kms="LPU64"\n", loi->loi_lvb.lvb_size, loi->loi_kms);
3321                 rc = ELDLM_OK;
3322         }
3323 }
3324 EXPORT_SYMBOL(osc_update_enqueue);
3325
3326 struct ptlrpc_request_set *PTLRPCD_SET = (void *)1;
3327
3328 /* When enqueuing asynchronously, locks are not ordered, we can obtain a lock
3329  * from the 2nd OSC before a lock from the 1st one. This does not deadlock with
3330  * other synchronous requests, however keeping some locks and trying to obtain
3331  * others may take a considerable amount of time in a case of ost failure; and
3332  * when other sync requests do not get released lock from a client, the client
3333  * is excluded from the cluster -- such scenarious make the life difficult, so
3334  * release locks just after they are obtained. */
3335 int osc_enqueue_base(struct obd_export *exp, struct ldlm_res_id *res_id,
3336                      int *flags, ldlm_policy_data_t *policy,
3337                      struct ost_lvb *lvb, int kms_valid,
3338                      obd_enqueue_update_f upcall, void *cookie,
3339                      struct ldlm_enqueue_info *einfo,
3340                      struct lustre_handle *lockh,
3341                      struct ptlrpc_request_set *rqset, int async)
3342 {
3343         struct obd_device *obd = exp->exp_obd;
3344         struct ptlrpc_request *req = NULL;
3345         int intent = *flags & LDLM_FL_HAS_INTENT;
3346         ldlm_mode_t mode;
3347         int rc;
3348         ENTRY;
3349
3350         /* Filesystem lock extents are extended to page boundaries so that
3351          * dealing with the page cache is a little smoother.  */
3352         policy->l_extent.start -= policy->l_extent.start & ~CFS_PAGE_MASK;
3353         policy->l_extent.end |= ~CFS_PAGE_MASK;
3354
3355         /*
3356          * kms is not valid when either object is completely fresh (so that no
3357          * locks are cached), or object was evicted. In the latter case cached
3358          * lock cannot be used, because it would prime inode state with
3359          * potentially stale LVB.
3360          */
3361         if (!kms_valid)
3362                 goto no_match;
3363
3364         /* Next, search for already existing extent locks that will cover us */
3365         /* If we're trying to read, we also search for an existing PW lock.  The
3366          * VFS and page cache already protect us locally, so lots of readers/
3367          * writers can share a single PW lock.
3368          *
3369          * There are problems with conversion deadlocks, so instead of
3370          * converting a read lock to a write lock, we'll just enqueue a new
3371          * one.
3372          *
3373          * At some point we should cancel the read lock instead of making them
3374          * send us a blocking callback, but there are problems with canceling
3375          * locks out from other users right now, too. */
3376         mode = einfo->ei_mode;
3377         if (einfo->ei_mode == LCK_PR)
3378                 mode |= LCK_PW;
3379         mode = ldlm_lock_match(obd->obd_namespace,
3380                                *flags | LDLM_FL_LVB_READY, res_id,
3381                                einfo->ei_type, policy, mode, lockh, 0);
3382         if (mode) {
3383                 struct ldlm_lock *matched = ldlm_handle2lock(lockh);
3384
3385                 if (matched->l_ast_data == NULL ||
3386                     matched->l_ast_data == einfo->ei_cbdata) {
3387                         /* addref the lock only if not async requests and PW
3388                          * lock is matched whereas we asked for PR. */
3389                         if (!rqset && einfo->ei_mode != mode)
3390                                 ldlm_lock_addref(lockh, LCK_PR);
3391                         osc_set_lock_data_with_check(matched, einfo, *flags);
3392                         if (intent) {
3393                                 /* I would like to be able to ASSERT here that
3394                                  * rss <= kms, but I can't, for reasons which
3395                                  * are explained in lov_enqueue() */
3396                         }
3397
3398                         /* We already have a lock, and it's referenced */
3399                         (*upcall)(cookie, ELDLM_OK);
3400
3401                         /* For async requests, decref the lock. */
3402                         if (einfo->ei_mode != mode)
3403                                 ldlm_lock_decref(lockh, LCK_PW);
3404                         else if (rqset)
3405                                 ldlm_lock_decref(lockh, einfo->ei_mode);
3406                         LDLM_LOCK_PUT(matched);
3407                         RETURN(ELDLM_OK);
3408                 } else
3409                         ldlm_lock_decref(lockh, mode);
3410                 LDLM_LOCK_PUT(matched);
3411         }
3412
3413  no_match:
3414         if (intent) {
3415                 CFS_LIST_HEAD(cancels);
3416                 req = ptlrpc_request_alloc(class_exp2cliimp(exp),
3417                                            &RQF_LDLM_ENQUEUE_LVB);
3418                 if (req == NULL)
3419                         RETURN(-ENOMEM);
3420
3421                 rc = ldlm_prep_enqueue_req(exp, req, &cancels, 0);
3422                 if (rc) {
3423                         ptlrpc_request_free(req);
3424                         RETURN(rc);
3425                 }
3426
3427                 req_capsule_set_size(&req->rq_pill, &RMF_DLM_LVB, RCL_SERVER,
3428                                      sizeof *lvb);
3429                 ptlrpc_request_set_replen(req);
3430         }
3431
3432         /* users of osc_enqueue() can pass this flag for ldlm_lock_match() */
3433         *flags &= ~LDLM_FL_BLOCK_GRANTED;
3434
3435         rc = ldlm_cli_enqueue(exp, &req, einfo, res_id, policy, flags, lvb,
3436                               sizeof(*lvb), lockh, async);
3437         if (rqset) {
3438                 if (!rc) {
3439                         struct osc_enqueue_args *aa;
3440                         CLASSERT (sizeof(*aa) <= sizeof(req->rq_async_args));
3441                         aa = ptlrpc_req_async_args(req);
3442                         aa->oa_ei = einfo;
3443                         aa->oa_exp = exp;
3444                         aa->oa_flags  = flags;
3445                         aa->oa_upcall = upcall;
3446                         aa->oa_cookie = cookie;
3447                         aa->oa_lvb    = lvb;
3448                         aa->oa_lockh  = lockh;
3449
3450                         req->rq_interpret_reply =
3451                                 (ptlrpc_interpterer_t)osc_enqueue_interpret;
3452                         if (rqset == PTLRPCD_SET)
3453                                 ptlrpcd_add_req(req, PSCOPE_OTHER);
3454                         else
3455                                 ptlrpc_set_add_req(rqset, req);
3456                 } else if (intent) {
3457                         ptlrpc_req_finished(req);
3458                 }
3459                 RETURN(rc);
3460         }
3461
3462         rc = osc_enqueue_fini(req, lvb, upcall, cookie, flags, rc);
3463         if (intent)
3464                 ptlrpc_req_finished(req);
3465
3466         RETURN(rc);
3467 }
3468
3469 static int osc_enqueue(struct obd_export *exp, struct obd_info *oinfo,
3470                        struct ldlm_enqueue_info *einfo,
3471                        struct ptlrpc_request_set *rqset)
3472 {
3473         struct ldlm_res_id res_id;
3474         int rc;
3475         ENTRY;
3476
3477         osc_build_res_name(oinfo->oi_md->lsm_object_id,
3478                            oinfo->oi_md->lsm_object_seq, &res_id);
3479
3480         rc = osc_enqueue_base(exp, &res_id, &oinfo->oi_flags, &oinfo->oi_policy,
3481                               &oinfo->oi_md->lsm_oinfo[0]->loi_lvb,
3482                               oinfo->oi_md->lsm_oinfo[0]->loi_kms_valid,
3483                               oinfo->oi_cb_up, oinfo, einfo, oinfo->oi_lockh,
3484                               rqset, rqset != NULL);
3485         RETURN(rc);
3486 }
3487
3488 int osc_match_base(struct obd_export *exp, struct ldlm_res_id *res_id,
3489                    __u32 type, ldlm_policy_data_t *policy, __u32 mode,
3490                    int *flags, void *data, struct lustre_handle *lockh,
3491                    int unref)
3492 {
3493         struct obd_device *obd = exp->exp_obd;
3494         int lflags = *flags;
3495         ldlm_mode_t rc;
3496         ENTRY;
3497
3498         if (OBD_FAIL_CHECK(OBD_FAIL_OSC_MATCH))
3499                 RETURN(-EIO);
3500
3501         /* Filesystem lock extents are extended to page boundaries so that
3502          * dealing with the page cache is a little smoother */
3503         policy->l_extent.start -= policy->l_extent.start & ~CFS_PAGE_MASK;
3504         policy->l_extent.end |= ~CFS_PAGE_MASK;
3505
3506         /* Next, search for already existing extent locks that will cover us */
3507         /* If we're trying to read, we also search for an existing PW lock.  The
3508          * VFS and page cache already protect us locally, so lots of readers/
3509          * writers can share a single PW lock. */
3510         rc = mode;
3511         if (mode == LCK_PR)
3512                 rc |= LCK_PW;
3513         rc = ldlm_lock_match(obd->obd_namespace, lflags,
3514                              res_id, type, policy, rc, lockh, unref);
3515         if (rc) {
3516                 if (data != NULL)
3517                         osc_set_data_with_check(lockh, data, lflags);
3518                 if (!(lflags & LDLM_FL_TEST_LOCK) && mode != rc) {
3519                         ldlm_lock_addref(lockh, LCK_PR);
3520                         ldlm_lock_decref(lockh, LCK_PW);
3521                 }
3522                 RETURN(rc);
3523         }
3524         RETURN(rc);
3525 }
3526
3527 int osc_cancel_base(struct lustre_handle *lockh, __u32 mode)
3528 {
3529         ENTRY;
3530
3531         if (unlikely(mode == LCK_GROUP))
3532                 ldlm_lock_decref_and_cancel(lockh, mode);
3533         else
3534                 ldlm_lock_decref(lockh, mode);
3535
3536         RETURN(0);
3537 }
3538
3539 static int osc_cancel(struct obd_export *exp, struct lov_stripe_md *md,
3540                       __u32 mode, struct lustre_handle *lockh)
3541 {
3542         ENTRY;
3543         RETURN(osc_cancel_base(lockh, mode));
3544 }
3545
3546 static int osc_cancel_unused(struct obd_export *exp,
3547                              struct lov_stripe_md *lsm,
3548                              ldlm_cancel_flags_t flags,
3549                              void *opaque)
3550 {
3551         struct obd_device *obd = class_exp2obd(exp);
3552         struct ldlm_res_id res_id, *resp = NULL;
3553
3554         if (lsm != NULL) {
3555                 resp = osc_build_res_name(lsm->lsm_object_id,
3556                                           lsm->lsm_object_seq, &res_id);
3557         }
3558
3559         return ldlm_cli_cancel_unused(obd->obd_namespace, resp, flags, opaque);
3560 }
3561
3562 static int osc_statfs_interpret(const struct lu_env *env,
3563                                 struct ptlrpc_request *req,
3564                                 struct osc_async_args *aa, int rc)
3565 {
3566         struct client_obd *cli = &req->rq_import->imp_obd->u.cli;
3567         struct obd_statfs *msfs;
3568         __u64 used;
3569         ENTRY;
3570
3571         if (rc == -EBADR)
3572                 /* The request has in fact never been sent
3573                  * due to issues at a higher level (LOV).
3574                  * Exit immediately since the caller is
3575                  * aware of the problem and takes care
3576                  * of the clean up */
3577                  RETURN(rc);
3578
3579         if ((rc == -ENOTCONN || rc == -EAGAIN) &&
3580             (aa->aa_oi->oi_flags & OBD_STATFS_NODELAY))
3581                 GOTO(out, rc = 0);
3582
3583         if (rc != 0)
3584                 GOTO(out, rc);
3585
3586         msfs = req_capsule_server_get(&req->rq_pill, &RMF_OBD_STATFS);
3587         if (msfs == NULL) {
3588                 GOTO(out, rc = -EPROTO);
3589         }
3590
3591         /* Reinitialize the RDONLY and DEGRADED flags at the client
3592          * on each statfs, so they don't stay set permanently. */
3593         cfs_spin_lock(&cli->cl_oscc.oscc_lock);
3594
3595         if (unlikely(msfs->os_state & OS_STATE_DEGRADED))
3596                 cli->cl_oscc.oscc_flags |= OSCC_FLAG_DEGRADED;
3597         else if (unlikely(cli->cl_oscc.oscc_flags & OSCC_FLAG_DEGRADED))
3598                 cli->cl_oscc.oscc_flags &= ~OSCC_FLAG_DEGRADED;
3599
3600         if (unlikely(msfs->os_state & OS_STATE_READONLY))
3601                 cli->cl_oscc.oscc_flags |= OSCC_FLAG_RDONLY;
3602         else if (unlikely(cli->cl_oscc.oscc_flags & OSCC_FLAG_RDONLY))
3603                 cli->cl_oscc.oscc_flags &= ~OSCC_FLAG_RDONLY;
3604
3605         /* Add a bit of hysteresis so this flag isn't continually flapping,
3606          * and ensure that new files don't get extremely fragmented due to
3607          * only a small amount of available space in the filesystem.
3608          * We want to set the NOSPC flag when there is less than ~0.1% free
3609          * and clear it when there is at least ~0.2% free space, so:
3610          *                   avail < ~0.1% max          max = avail + used
3611          *            1025 * avail < avail + used       used = blocks - free
3612          *            1024 * avail < used
3613          *            1024 * avail < blocks - free                      
3614          *                   avail < ((blocks - free) >> 10)    
3615          *
3616          * On very large disk, say 16TB 0.1% will be 16 GB. We don't want to
3617          * lose that amount of space so in those cases we report no space left
3618          * if their is less than 1 GB left.                             */
3619         used = min_t(__u64,(msfs->os_blocks - msfs->os_bfree) >> 10, 1 << 30);
3620         if (unlikely(((cli->cl_oscc.oscc_flags & OSCC_FLAG_NOSPC) == 0) &&
3621                      ((msfs->os_ffree < 32) || (msfs->os_bavail < used))))
3622                 cli->cl_oscc.oscc_flags |= OSCC_FLAG_NOSPC;
3623         else if (unlikely(((cli->cl_oscc.oscc_flags & OSCC_FLAG_NOSPC) != 0) &&
3624                 (msfs->os_ffree > 64) && (msfs->os_bavail > (used << 1))))
3625                         cli->cl_oscc.oscc_flags &= ~OSCC_FLAG_NOSPC;
3626
3627         cfs_spin_unlock(&cli->cl_oscc.oscc_lock);
3628
3629         *aa->aa_oi->oi_osfs = *msfs;
3630 out:
3631         rc = aa->aa_oi->oi_cb_up(aa->aa_oi, rc);
3632         RETURN(rc);
3633 }
3634
3635 static int osc_statfs_async(struct obd_device *obd, struct obd_info *oinfo,
3636                             __u64 max_age, struct ptlrpc_request_set *rqset)
3637 {
3638         struct ptlrpc_request *req;
3639         struct osc_async_args *aa;
3640         int                    rc;
3641         ENTRY;
3642
3643         /* We could possibly pass max_age in the request (as an absolute
3644          * timestamp or a "seconds.usec ago") so the target can avoid doing
3645          * extra calls into the filesystem if that isn't necessary (e.g.
3646          * during mount that would help a bit).  Having relative timestamps
3647          * is not so great if request processing is slow, while absolute
3648          * timestamps are not ideal because they need time synchronization. */
3649         req = ptlrpc_request_alloc(obd->u.cli.cl_import, &RQF_OST_STATFS);
3650         if (req == NULL)
3651                 RETURN(-ENOMEM);
3652
3653         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_STATFS);
3654         if (rc) {
3655                 ptlrpc_request_free(req);
3656                 RETURN(rc);
3657         }
3658         ptlrpc_request_set_replen(req);
3659         req->rq_request_portal = OST_CREATE_PORTAL;
3660         ptlrpc_at_set_req_timeout(req);
3661
3662         if (oinfo->oi_flags & OBD_STATFS_NODELAY) {
3663                 /* procfs requests not want stat in wait for avoid deadlock */
3664                 req->rq_no_resend = 1;
3665                 req->rq_no_delay = 1;
3666         }
3667
3668         req->rq_interpret_reply = (ptlrpc_interpterer_t)osc_statfs_interpret;
3669         CLASSERT (sizeof(*aa) <= sizeof(req->rq_async_args));
3670         aa = ptlrpc_req_async_args(req);
3671         aa->aa_oi = oinfo;
3672
3673         ptlrpc_set_add_req(rqset, req);
3674         RETURN(0);
3675 }
3676
3677 static int osc_statfs(struct obd_device *obd, struct obd_statfs *osfs,
3678                       __u64 max_age, __u32 flags)
3679 {
3680         struct obd_statfs     *msfs;
3681         struct ptlrpc_request *req;
3682         struct obd_import     *imp = NULL;
3683         int rc;
3684         ENTRY;
3685
3686         /*Since the request might also come from lprocfs, so we need
3687          *sync this with client_disconnect_export Bug15684*/
3688         cfs_down_read(&obd->u.cli.cl_sem);
3689         if (obd->u.cli.cl_import)
3690                 imp = class_import_get(obd->u.cli.cl_import);
3691         cfs_up_read(&obd->u.cli.cl_sem);
3692         if (!imp)
3693                 RETURN(-ENODEV);
3694
3695         /* We could possibly pass max_age in the request (as an absolute
3696          * timestamp or a "seconds.usec ago") so the target can avoid doing
3697          * extra calls into the filesystem if that isn't necessary (e.g.
3698          * during mount that would help a bit).  Having relative timestamps
3699          * is not so great if request processing is slow, while absolute
3700          * timestamps are not ideal because they need time synchronization. */
3701         req = ptlrpc_request_alloc(imp, &RQF_OST_STATFS);
3702
3703         class_import_put(imp);
3704
3705         if (req == NULL)
3706                 RETURN(-ENOMEM);
3707
3708         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_STATFS);
3709         if (rc) {
3710                 ptlrpc_request_free(req);
3711                 RETURN(rc);
3712         }
3713         ptlrpc_request_set_replen(req);
3714         req->rq_request_portal = OST_CREATE_PORTAL;
3715         ptlrpc_at_set_req_timeout(req);
3716
3717         if (flags & OBD_STATFS_NODELAY) {
3718                 /* procfs requests not want stat in wait for avoid deadlock */
3719                 req->rq_no_resend = 1;
3720                 req->rq_no_delay = 1;
3721         }
3722
3723         rc = ptlrpc_queue_wait(req);
3724         if (rc)
3725                 GOTO(out, rc);
3726
3727         msfs = req_capsule_server_get(&req->rq_pill, &RMF_OBD_STATFS);
3728         if (msfs == NULL) {
3729                 GOTO(out, rc = -EPROTO);
3730         }
3731
3732         *osfs = *msfs;
3733
3734         EXIT;
3735  out:
3736         ptlrpc_req_finished(req);
3737         return rc;
3738 }
3739
3740 /* Retrieve object striping information.
3741  *
3742  * @lmmu is a pointer to an in-core struct with lmm_ost_count indicating
3743  * the maximum number of OST indices which will fit in the user buffer.
3744  * lmm_magic must be LOV_MAGIC (we only use 1 slot here).
3745  */
3746 static int osc_getstripe(struct lov_stripe_md *lsm, struct lov_user_md *lump)
3747 {
3748         /* we use lov_user_md_v3 because it is larger than lov_user_md_v1 */
3749         struct lov_user_md_v3 lum, *lumk;
3750         struct lov_user_ost_data_v1 *lmm_objects;
3751         int rc = 0, lum_size;
3752         ENTRY;
3753
3754         if (!lsm)
3755                 RETURN(-ENODATA);
3756
3757         /* we only need the header part from user space to get lmm_magic and
3758          * lmm_stripe_count, (the header part is common to v1 and v3) */
3759         lum_size = sizeof(struct lov_user_md_v1);
3760         if (cfs_copy_from_user(&lum, lump, lum_size))
3761                 RETURN(-EFAULT);
3762
3763         if ((lum.lmm_magic != LOV_USER_MAGIC_V1) &&
3764             (lum.lmm_magic != LOV_USER_MAGIC_V3))
3765                 RETURN(-EINVAL);
3766
3767         /* lov_user_md_vX and lov_mds_md_vX must have the same size */
3768         LASSERT(sizeof(struct lov_user_md_v1) == sizeof(struct lov_mds_md_v1));
3769         LASSERT(sizeof(struct lov_user_md_v3) == sizeof(struct lov_mds_md_v3));
3770         LASSERT(sizeof(lum.lmm_objects[0]) == sizeof(lumk->lmm_objects[0]));
3771
3772         /* we can use lov_mds_md_size() to compute lum_size
3773          * because lov_user_md_vX and lov_mds_md_vX have the same size */
3774         if (lum.lmm_stripe_count > 0) {
3775                 lum_size = lov_mds_md_size(lum.lmm_stripe_count, lum.lmm_magic);
3776                 OBD_ALLOC(lumk, lum_size);
3777                 if (!lumk)
3778                         RETURN(-ENOMEM);
3779
3780                 if (lum.lmm_magic == LOV_USER_MAGIC_V1)
3781                         lmm_objects = &(((struct lov_user_md_v1 *)lumk)->lmm_objects[0]);
3782                 else
3783                         lmm_objects = &(lumk->lmm_objects[0]);
3784                 lmm_objects->l_object_id = lsm->lsm_object_id;
3785         } else {
3786                 lum_size = lov_mds_md_size(0, lum.lmm_magic);
3787                 lumk = &lum;
3788         }
3789
3790         lumk->lmm_object_id = lsm->lsm_object_id;
3791         lumk->lmm_object_seq = lsm->lsm_object_seq;
3792         lumk->lmm_stripe_count = 1;
3793
3794         if (cfs_copy_to_user(lump, lumk, lum_size))
3795                 rc = -EFAULT;
3796
3797         if (lumk != &lum)
3798                 OBD_FREE(lumk, lum_size);
3799
3800         RETURN(rc);
3801 }
3802
3803
3804 static int osc_iocontrol(unsigned int cmd, struct obd_export *exp, int len,
3805                          void *karg, void *uarg)
3806 {
3807         struct obd_device *obd = exp->exp_obd;
3808         struct obd_ioctl_data *data = karg;
3809         int err = 0;
3810         ENTRY;
3811
3812         if (!cfs_try_module_get(THIS_MODULE)) {
3813                 CERROR("Can't get module. Is it alive?");
3814                 return -EINVAL;
3815         }
3816         switch (cmd) {
3817         case OBD_IOC_LOV_GET_CONFIG: {
3818                 char *buf;
3819                 struct lov_desc *desc;
3820                 struct obd_uuid uuid;
3821
3822                 buf = NULL;
3823                 len = 0;
3824                 if (obd_ioctl_getdata(&buf, &len, (void *)uarg))
3825                         GOTO(out, err = -EINVAL);
3826
3827                 data = (struct obd_ioctl_data *)buf;
3828
3829                 if (sizeof(*desc) > data->ioc_inllen1) {
3830                         obd_ioctl_freedata(buf, len);
3831                         GOTO(out, err = -EINVAL);
3832                 }
3833
3834                 if (data->ioc_inllen2 < sizeof(uuid)) {
3835                         obd_ioctl_freedata(buf, len);
3836                         GOTO(out, err = -EINVAL);
3837                 }
3838
3839                 desc = (struct lov_desc *)data->ioc_inlbuf1;
3840                 desc->ld_tgt_count = 1;
3841                 desc->ld_active_tgt_count = 1;
3842                 desc->ld_default_stripe_count = 1;
3843                 desc->ld_default_stripe_size = 0;
3844                 desc->ld_default_stripe_offset = 0;
3845                 desc->ld_pattern = 0;
3846                 memcpy(&desc->ld_uuid, &obd->obd_uuid, sizeof(uuid));
3847
3848                 memcpy(data->ioc_inlbuf2, &obd->obd_uuid, sizeof(uuid));
3849
3850                 err = cfs_copy_to_user((void *)uarg, buf, len);
3851                 if (err)
3852                         err = -EFAULT;
3853                 obd_ioctl_freedata(buf, len);
3854                 GOTO(out, err);
3855         }
3856         case LL_IOC_LOV_SETSTRIPE:
3857                 err = obd_alloc_memmd(exp, karg);
3858                 if (err > 0)
3859                         err = 0;
3860                 GOTO(out, err);
3861         case LL_IOC_LOV_GETSTRIPE:
3862                 err = osc_getstripe(karg, uarg);
3863                 GOTO(out, err);
3864         case OBD_IOC_CLIENT_RECOVER:
3865                 err = ptlrpc_recover_import(obd->u.cli.cl_import,
3866                                             data->ioc_inlbuf1);
3867                 if (err > 0)
3868                         err = 0;
3869                 GOTO(out, err);
3870         case IOC_OSC_SET_ACTIVE:
3871                 err = ptlrpc_set_import_active(obd->u.cli.cl_import,
3872                                                data->ioc_offset);
3873                 GOTO(out, err);
3874         case OBD_IOC_POLL_QUOTACHECK:
3875                 err = lquota_poll_check(quota_interface, exp,
3876                                         (struct if_quotacheck *)karg);
3877                 GOTO(out, err);
3878         case OBD_IOC_PING_TARGET:
3879                 err = ptlrpc_obd_ping(obd);
3880                 GOTO(out, err);
3881         default:
3882                 CDEBUG(D_INODE, "unrecognised ioctl %#x by %s\n",
3883                        cmd, cfs_curproc_comm());
3884                 GOTO(out, err = -ENOTTY);
3885         }
3886 out:
3887         cfs_module_put(THIS_MODULE);
3888         return err;
3889 }
3890
3891 static int osc_get_info(struct obd_export *exp, obd_count keylen,
3892                         void *key, __u32 *vallen, void *val,
3893                         struct lov_stripe_md *lsm)
3894 {
3895         ENTRY;
3896         if (!vallen || !val)
3897                 RETURN(-EFAULT);
3898
3899         if (KEY_IS(KEY_LOCK_TO_STRIPE)) {
3900                 __u32 *stripe = val;
3901                 *vallen = sizeof(*stripe);
3902                 *stripe = 0;
3903                 RETURN(0);
3904         } else if (KEY_IS(KEY_LAST_ID)) {
3905                 struct ptlrpc_request *req;
3906                 obd_id                *reply;
3907                 char                  *tmp;
3908                 int                    rc;
3909
3910                 req = ptlrpc_request_alloc(class_exp2cliimp(exp),
3911                                            &RQF_OST_GET_INFO_LAST_ID);
3912                 if (req == NULL)
3913                         RETURN(-ENOMEM);
3914
3915                 req_capsule_set_size(&req->rq_pill, &RMF_SETINFO_KEY,
3916                                      RCL_CLIENT, keylen);
3917                 rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_GET_INFO);
3918                 if (rc) {
3919                         ptlrpc_request_free(req);
3920                         RETURN(rc);
3921                 }
3922
3923                 tmp = req_capsule_client_get(&req->rq_pill, &RMF_SETINFO_KEY);
3924                 memcpy(tmp, key, keylen);
3925
3926                 req->rq_no_delay = req->rq_no_resend = 1;
3927                 ptlrpc_request_set_replen(req);
3928                 rc = ptlrpc_queue_wait(req);
3929                 if (rc)
3930                         GOTO(out, rc);
3931
3932                 reply = req_capsule_server_get(&req->rq_pill, &RMF_OBD_ID);
3933                 if (reply == NULL)
3934                         GOTO(out, rc = -EPROTO);
3935
3936                 *((obd_id *)val) = *reply;
3937         out:
3938                 ptlrpc_req_finished(req);
3939                 RETURN(rc);
3940         } else if (KEY_IS(KEY_FIEMAP)) {
3941                 struct ptlrpc_request *req;
3942                 struct ll_user_fiemap *reply;
3943                 char *tmp;
3944                 int rc;
3945
3946                 req = ptlrpc_request_alloc(class_exp2cliimp(exp),
3947                                            &RQF_OST_GET_INFO_FIEMAP);
3948                 if (req == NULL)
3949                         RETURN(-ENOMEM);
3950
3951                 req_capsule_set_size(&req->rq_pill, &RMF_FIEMAP_KEY,
3952                                      RCL_CLIENT, keylen);
3953                 req_capsule_set_size(&req->rq_pill, &RMF_FIEMAP_VAL,
3954                                      RCL_CLIENT, *vallen);
3955                 req_capsule_set_size(&req->rq_pill, &RMF_FIEMAP_VAL,
3956                                      RCL_SERVER, *vallen);
3957
3958                 rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_GET_INFO);
3959                 if (rc) {
3960                         ptlrpc_request_free(req);
3961                         RETURN(rc);
3962                 }
3963
3964                 tmp = req_capsule_client_get(&req->rq_pill, &RMF_FIEMAP_KEY);
3965                 memcpy(tmp, key, keylen);
3966                 tmp = req_capsule_client_get(&req->rq_pill, &RMF_FIEMAP_VAL);
3967                 memcpy(tmp, val, *vallen);
3968
3969                 ptlrpc_request_set_replen(req);
3970                 rc = ptlrpc_queue_wait(req);
3971                 if (rc)
3972                         GOTO(out1, rc);
3973
3974                 reply = req_capsule_server_get(&req->rq_pill, &RMF_FIEMAP_VAL);
3975                 if (reply == NULL)
3976                         GOTO(out1, rc = -EPROTO);
3977
3978                 memcpy(val, reply, *vallen);
3979         out1:
3980                 ptlrpc_req_finished(req);
3981
3982                 RETURN(rc);
3983         }
3984
3985         RETURN(-EINVAL);
3986 }
3987
3988 static int osc_setinfo_mds_connect_import(struct obd_import *imp)
3989 {
3990         struct llog_ctxt *ctxt;
3991         int rc = 0;
3992         ENTRY;
3993
3994         ctxt = llog_get_context(imp->imp_obd, LLOG_MDS_OST_ORIG_CTXT);
3995         if (ctxt) {
3996                 rc = llog_initiator_connect(ctxt);
3997                 llog_ctxt_put(ctxt);
3998         } else {
3999                 /* XXX return an error? skip setting below flags? */
4000         }
4001
4002         cfs_spin_lock(&imp->imp_lock);
4003         imp->imp_server_timeout = 1;
4004         imp->imp_pingable = 1;
4005         cfs_spin_unlock(&imp->imp_lock);
4006         CDEBUG(D_RPCTRACE, "pinging OST %s\n", obd2cli_tgt(imp->imp_obd));
4007
4008         RETURN(rc);
4009 }
4010
4011 static int osc_setinfo_mds_conn_interpret(const struct lu_env *env,
4012                                           struct ptlrpc_request *req,
4013                                           void *aa, int rc)
4014 {
4015         ENTRY;
4016         if (rc != 0)
4017                 RETURN(rc);
4018
4019         RETURN(osc_setinfo_mds_connect_import(req->rq_import));
4020 }
4021
4022 static int osc_set_info_async(struct obd_export *exp, obd_count keylen,
4023                               void *key, obd_count vallen, void *val,
4024                               struct ptlrpc_request_set *set)
4025 {
4026         struct ptlrpc_request *req;
4027         struct obd_device     *obd = exp->exp_obd;
4028         struct obd_import     *imp = class_exp2cliimp(exp);
4029         char                  *tmp;
4030         int                    rc;
4031         ENTRY;
4032
4033         OBD_FAIL_TIMEOUT(OBD_FAIL_OSC_SHUTDOWN, 10);
4034
4035         if (KEY_IS(KEY_NEXT_ID)) {
4036                 obd_id new_val;
4037                 struct osc_creator *oscc = &obd->u.cli.cl_oscc;
4038
4039                 if (vallen != sizeof(obd_id))
4040                         RETURN(-ERANGE);
4041                 if (val == NULL)
4042                         RETURN(-EINVAL);
4043
4044                 if (vallen != sizeof(obd_id))
4045                         RETURN(-EINVAL);
4046
4047                 /* avoid race between allocate new object and set next id
4048                  * from ll_sync thread */
4049                 cfs_spin_lock(&oscc->oscc_lock);
4050                 new_val = *((obd_id*)val) + 1;
4051                 if (new_val > oscc->oscc_next_id)
4052                         oscc->oscc_next_id = new_val;
4053                 cfs_spin_unlock(&oscc->oscc_lock);
4054                 CDEBUG(D_HA, "%s: set oscc_next_id = "LPU64"\n",
4055                        exp->exp_obd->obd_name,
4056                        obd->u.cli.cl_oscc.oscc_next_id);
4057
4058                 RETURN(0);
4059         }
4060
4061         if (KEY_IS(KEY_CHECKSUM)) {
4062                 if (vallen != sizeof(int))
4063                         RETURN(-EINVAL);
4064                 exp->exp_obd->u.cli.cl_checksum = (*(int *)val) ? 1 : 0;
4065                 RETURN(0);
4066         }
4067
4068         if (KEY_IS(KEY_SPTLRPC_CONF)) {
4069                 sptlrpc_conf_client_adapt(obd);
4070                 RETURN(0);
4071         }
4072
4073         if (KEY_IS(KEY_FLUSH_CTX)) {
4074                 sptlrpc_import_flush_my_ctx(imp);
4075                 RETURN(0);
4076         }
4077
4078         if (!set && !KEY_IS(KEY_GRANT_SHRINK))
4079                 RETURN(-EINVAL);
4080
4081         /* We pass all other commands directly to OST. Since nobody calls osc
4082            methods directly and everybody is supposed to go through LOV, we
4083            assume lov checked invalid values for us.
4084            The only recognised values so far are evict_by_nid and mds_conn.
4085            Even if something bad goes through, we'd get a -EINVAL from OST
4086            anyway. */
4087
4088         if (KEY_IS(KEY_GRANT_SHRINK))
4089                 req = ptlrpc_request_alloc(imp, &RQF_OST_SET_GRANT_INFO);
4090         else
4091                 req = ptlrpc_request_alloc(imp, &RQF_OBD_SET_INFO);
4092
4093         if (req == NULL)
4094                 RETURN(-ENOMEM);
4095
4096         req_capsule_set_size(&req->rq_pill, &RMF_SETINFO_KEY,
4097                              RCL_CLIENT, keylen);
4098         req_capsule_set_size(&req->rq_pill, &RMF_SETINFO_VAL,
4099                              RCL_CLIENT, vallen);
4100         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_SET_INFO);
4101         if (rc) {
4102                 ptlrpc_request_free(req);
4103                 RETURN(rc);
4104         }
4105
4106         tmp = req_capsule_client_get(&req->rq_pill, &RMF_SETINFO_KEY);
4107         memcpy(tmp, key, keylen);
4108         tmp = req_capsule_client_get(&req->rq_pill, &RMF_SETINFO_VAL);
4109         memcpy(tmp, val, vallen);
4110
4111         if (KEY_IS(KEY_MDS_CONN)) {
4112                 struct osc_creator *oscc = &obd->u.cli.cl_oscc;
4113
4114                 oscc->oscc_oa.o_seq = (*(__u32 *)val);
4115                 oscc->oscc_oa.o_valid |= OBD_MD_FLGROUP;
4116                 LASSERT_SEQ_IS_MDT(oscc->oscc_oa.o_seq);
4117                 req->rq_no_delay = req->rq_no_resend = 1;
4118                 req->rq_interpret_reply = osc_setinfo_mds_conn_interpret;
4119         } else if (KEY_IS(KEY_GRANT_SHRINK)) {
4120                 struct osc_grant_args *aa;
4121                 struct obdo *oa;
4122
4123                 CLASSERT(sizeof(*aa) <= sizeof(req->rq_async_args));
4124                 aa = ptlrpc_req_async_args(req);
4125                 OBDO_ALLOC(oa);
4126                 if (!oa) {
4127                         ptlrpc_req_finished(req);
4128                         RETURN(-ENOMEM);
4129                 }
4130                 *oa = ((struct ost_body *)val)->oa;
4131                 aa->aa_oa = oa;
4132                 req->rq_interpret_reply = osc_shrink_grant_interpret;
4133         }
4134
4135         ptlrpc_request_set_replen(req);
4136         if (!KEY_IS(KEY_GRANT_SHRINK)) {
4137                 LASSERT(set != NULL);
4138                 ptlrpc_set_add_req(set, req);
4139                 ptlrpc_check_set(NULL, set);
4140         } else
4141                 ptlrpcd_add_req(req, PSCOPE_OTHER);
4142
4143         RETURN(0);
4144 }
4145
4146
4147 static struct llog_operations osc_size_repl_logops = {
4148         lop_cancel: llog_obd_repl_cancel
4149 };
4150
4151 static struct llog_operations osc_mds_ost_orig_logops;
4152
4153 static int __osc_llog_init(struct obd_device *obd, struct obd_llog_group *olg,
4154                            struct obd_device *tgt, struct llog_catid *catid)
4155 {
4156         int rc;
4157         ENTRY;
4158
4159         rc = llog_setup(obd, &obd->obd_olg, LLOG_MDS_OST_ORIG_CTXT, tgt, 1,
4160                         &catid->lci_logid, &osc_mds_ost_orig_logops);
4161         if (rc) {
4162                 CERROR("failed LLOG_MDS_OST_ORIG_CTXT\n");
4163                 GOTO(out, rc);
4164         }
4165
4166         rc = llog_setup(obd, &obd->obd_olg, LLOG_SIZE_REPL_CTXT, tgt, 1,
4167                         NULL, &osc_size_repl_logops);
4168         if (rc) {
4169                 struct llog_ctxt *ctxt =
4170                         llog_get_context(obd, LLOG_MDS_OST_ORIG_CTXT);
4171                 if (ctxt)
4172                         llog_cleanup(ctxt);
4173                 CERROR("failed LLOG_SIZE_REPL_CTXT\n");
4174         }
4175         GOTO(out, rc);
4176 out:
4177         if (rc) {
4178                 CERROR("osc '%s' tgt '%s' catid %p rc=%d\n",
4179                        obd->obd_name, tgt->obd_name, catid, rc);
4180                 CERROR("logid "LPX64":0x%x\n",
4181                        catid->lci_logid.lgl_oid, catid->lci_logid.lgl_ogen);
4182         }
4183         return rc;
4184 }
4185
4186 static int osc_llog_init(struct obd_device *obd, struct obd_llog_group *olg,
4187                          struct obd_device *disk_obd, int *index)
4188 {
4189         struct llog_catid catid;
4190         static char name[32] = CATLIST;
4191         int rc;
4192         ENTRY;
4193
4194         LASSERT(olg == &obd->obd_olg);
4195
4196         cfs_mutex_down(&olg->olg_cat_processing);
4197         rc = llog_get_cat_list(disk_obd, name, *index, 1, &catid);
4198         if (rc) {
4199                 CERROR("rc: %d\n", rc);
4200                 GOTO(out, rc);
4201         }
4202
4203         CDEBUG(D_INFO, "%s: Init llog for %d - catid "LPX64"/"LPX64":%x\n",
4204                obd->obd_name, *index, catid.lci_logid.lgl_oid,
4205                catid.lci_logid.lgl_oseq, catid.lci_logid.lgl_ogen);
4206
4207         rc = __osc_llog_init(obd, olg, disk_obd, &catid);
4208         if (rc) {
4209                 CERROR("rc: %d\n", rc);
4210                 GOTO(out, rc);
4211         }
4212
4213         rc = llog_put_cat_list(disk_obd, name, *index, 1, &catid);
4214         if (rc) {
4215                 CERROR("rc: %d\n", rc);
4216                 GOTO(out, rc);
4217         }
4218
4219  out:
4220         cfs_mutex_up(&olg->olg_cat_processing);
4221
4222         return rc;
4223 }
4224
4225 static int osc_llog_finish(struct obd_device *obd, int count)
4226 {
4227         struct llog_ctxt *ctxt;
4228         int rc = 0, rc2 = 0;
4229         ENTRY;
4230
4231         ctxt = llog_get_context(obd, LLOG_MDS_OST_ORIG_CTXT);
4232         if (ctxt)
4233                 rc = llog_cleanup(ctxt);
4234
4235         ctxt = llog_get_context(obd, LLOG_SIZE_REPL_CTXT);
4236         if (ctxt)
4237                 rc2 = llog_cleanup(ctxt);
4238         if (!rc)
4239                 rc = rc2;
4240
4241         RETURN(rc);
4242 }
4243
4244 static int osc_reconnect(const struct lu_env *env,
4245                          struct obd_export *exp, struct obd_device *obd,
4246                          struct obd_uuid *cluuid,
4247                          struct obd_connect_data *data,
4248                          void *localdata)
4249 {
4250         struct client_obd *cli = &obd->u.cli;
4251
4252         if (data != NULL && (data->ocd_connect_flags & OBD_CONNECT_GRANT)) {
4253                 long lost_grant;
4254
4255                 client_obd_list_lock(&cli->cl_loi_list_lock);
4256                 data->ocd_grant = (cli->cl_avail_grant + cli->cl_dirty) ?:
4257                                 2 * cli->cl_max_pages_per_rpc << CFS_PAGE_SHIFT;
4258                 lost_grant = cli->cl_lost_grant;
4259                 cli->cl_lost_grant = 0;
4260                 client_obd_list_unlock(&cli->cl_loi_list_lock);
4261
4262                 CDEBUG(D_CACHE, "request ocd_grant: %d cl_avail_grant: %ld "
4263                        "cl_dirty: %ld cl_lost_grant: %ld\n", data->ocd_grant,
4264                        cli->cl_avail_grant, cli->cl_dirty, lost_grant);
4265                 CDEBUG(D_RPCTRACE, "ocd_connect_flags: "LPX64" ocd_version: %d"
4266                        " ocd_grant: %d\n", data->ocd_connect_flags,
4267                        data->ocd_version, data->ocd_grant);
4268         }
4269
4270         RETURN(0);
4271 }
4272
4273 static int osc_disconnect(struct obd_export *exp)
4274 {
4275         struct obd_device *obd = class_exp2obd(exp);
4276         struct llog_ctxt  *ctxt;
4277         int rc;
4278
4279         ctxt = llog_get_context(obd, LLOG_SIZE_REPL_CTXT);
4280         if (ctxt) {
4281                 if (obd->u.cli.cl_conn_count == 1) {
4282                         /* Flush any remaining cancel messages out to the
4283                          * target */
4284                         llog_sync(ctxt, exp);
4285                 }
4286                 llog_ctxt_put(ctxt);
4287         } else {
4288                 CDEBUG(D_HA, "No LLOG_SIZE_REPL_CTXT found in obd %p\n",
4289                        obd);
4290         }
4291
4292         rc = client_disconnect_export(exp);
4293         /**
4294          * Initially we put del_shrink_grant before disconnect_export, but it
4295          * causes the following problem if setup (connect) and cleanup
4296          * (disconnect) are tangled together.
4297          *      connect p1                     disconnect p2
4298          *   ptlrpc_connect_import
4299          *     ...............               class_manual_cleanup
4300          *                                     osc_disconnect
4301          *                                     del_shrink_grant
4302          *   ptlrpc_connect_interrupt
4303          *     init_grant_shrink
4304          *   add this client to shrink list
4305          *                                      cleanup_osc
4306          * Bang! pinger trigger the shrink.
4307          * So the osc should be disconnected from the shrink list, after we
4308          * are sure the import has been destroyed. BUG18662
4309          */
4310         if (obd->u.cli.cl_import == NULL)
4311                 osc_del_shrink_grant(&obd->u.cli);
4312         return rc;
4313 }
4314
4315 static int osc_import_event(struct obd_device *obd,
4316                             struct obd_import *imp,
4317                             enum obd_import_event event)
4318 {
4319         struct client_obd *cli;
4320         int rc = 0;
4321
4322         ENTRY;
4323         LASSERT(imp->imp_obd == obd);
4324
4325         switch (event) {
4326         case IMP_EVENT_DISCON: {
4327                 /* Only do this on the MDS OSC's */
4328                 if (imp->imp_server_timeout) {
4329                         struct osc_creator *oscc = &obd->u.cli.cl_oscc;
4330
4331                         cfs_spin_lock(&oscc->oscc_lock);
4332                         oscc->oscc_flags |= OSCC_FLAG_RECOVERING;
4333                         cfs_spin_unlock(&oscc->oscc_lock);
4334                 }
4335                 cli = &obd->u.cli;
4336                 client_obd_list_lock(&cli->cl_loi_list_lock);
4337                 cli->cl_avail_grant = 0;
4338                 cli->cl_lost_grant = 0;
4339                 client_obd_list_unlock(&cli->cl_loi_list_lock);
4340                 break;
4341         }
4342         case IMP_EVENT_INACTIVE: {
4343                 rc = obd_notify_observer(obd, obd, OBD_NOTIFY_INACTIVE, NULL);
4344                 break;
4345         }
4346         case IMP_EVENT_INVALIDATE: {
4347                 struct ldlm_namespace *ns = obd->obd_namespace;
4348                 struct lu_env         *env;
4349                 int                    refcheck;
4350
4351                 env = cl_env_get(&refcheck);
4352                 if (!IS_ERR(env)) {
4353                         /* Reset grants */
4354                         cli = &obd->u.cli;
4355                         client_obd_list_lock(&cli->cl_loi_list_lock);
4356                         /* all pages go to failing rpcs due to the invalid
4357                          * import */
4358                         osc_check_rpcs(env, cli);
4359                         client_obd_list_unlock(&cli->cl_loi_list_lock);
4360
4361                         ldlm_namespace_cleanup(ns, LDLM_FL_LOCAL_ONLY);
4362                         cl_env_put(env, &refcheck);
4363                 } else
4364                         rc = PTR_ERR(env);
4365                 break;
4366         }
4367         case IMP_EVENT_ACTIVE: {
4368                 /* Only do this on the MDS OSC's */
4369                 if (imp->imp_server_timeout) {
4370                         struct osc_creator *oscc = &obd->u.cli.cl_oscc;
4371
4372                         cfs_spin_lock(&oscc->oscc_lock);
4373                         oscc->oscc_flags &= ~OSCC_FLAG_NOSPC;
4374                         cfs_spin_unlock(&oscc->oscc_lock);
4375                 }
4376                 rc = obd_notify_observer(obd, obd, OBD_NOTIFY_ACTIVE, NULL);
4377                 break;
4378         }
4379         case IMP_EVENT_OCD: {
4380                 struct obd_connect_data *ocd = &imp->imp_connect_data;
4381
4382                 if (ocd->ocd_connect_flags & OBD_CONNECT_GRANT)
4383                         osc_init_grant(&obd->u.cli, ocd);
4384
4385                 /* See bug 7198 */
4386                 if (ocd->ocd_connect_flags & OBD_CONNECT_REQPORTAL)
4387                         imp->imp_client->cli_request_portal =OST_REQUEST_PORTAL;
4388
4389                 rc = obd_notify_observer(obd, obd, OBD_NOTIFY_OCD, NULL);
4390                 break;
4391         }
4392         case IMP_EVENT_DEACTIVATE: {
4393                 rc = obd_notify_observer(obd, obd, OBD_NOTIFY_DEACTIVATE, NULL);
4394                 break;
4395         }
4396         case IMP_EVENT_ACTIVATE: {
4397                 rc = obd_notify_observer(obd, obd, OBD_NOTIFY_ACTIVATE, NULL);
4398                 break;
4399         }
4400         default:
4401                 CERROR("Unknown import event %d\n", event);
4402                 LBUG();
4403         }
4404         RETURN(rc);
4405 }
4406
4407 /**
4408  * Determine whether the lock can be canceled before replaying the lock
4409  * during recovery, see bug16774 for detailed information.
4410  *
4411  * \retval zero the lock can't be canceled
4412  * \retval other ok to cancel
4413  */
4414 static int osc_cancel_for_recovery(struct ldlm_lock *lock)
4415 {
4416         check_res_locked(lock->l_resource);
4417
4418         /*
4419          * Cancel all unused extent lock in granted mode LCK_PR or LCK_CR.
4420          *
4421          * XXX as a future improvement, we can also cancel unused write lock
4422          * if it doesn't have dirty data and active mmaps.
4423          */
4424         if (lock->l_resource->lr_type == LDLM_EXTENT &&
4425             (lock->l_granted_mode == LCK_PR ||
4426              lock->l_granted_mode == LCK_CR) &&
4427             (osc_dlm_lock_pageref(lock) == 0))
4428                 RETURN(1);
4429
4430         RETURN(0);
4431 }
4432
4433 int osc_setup(struct obd_device *obd, struct lustre_cfg *lcfg)
4434 {
4435         int rc;
4436         ENTRY;
4437
4438         ENTRY;
4439         rc = ptlrpcd_addref();
4440         if (rc)
4441                 RETURN(rc);
4442
4443         rc = client_obd_setup(obd, lcfg);
4444         if (rc) {
4445                 ptlrpcd_decref();
4446         } else {
4447                 struct lprocfs_static_vars lvars = { 0 };
4448                 struct client_obd *cli = &obd->u.cli;
4449
4450                 cli->cl_grant_shrink_interval = GRANT_SHRINK_INTERVAL;
4451                 lprocfs_osc_init_vars(&lvars);
4452                 if (lprocfs_obd_setup(obd, lvars.obd_vars) == 0) {
4453                         lproc_osc_attach_seqstat(obd);
4454                         sptlrpc_lprocfs_cliobd_attach(obd);
4455                         ptlrpc_lprocfs_register_obd(obd);
4456                 }
4457
4458                 oscc_init(obd);
4459                 /* We need to allocate a few requests more, because
4460                    brw_interpret tries to create new requests before freeing
4461                    previous ones. Ideally we want to have 2x max_rpcs_in_flight
4462                    reserved, but I afraid that might be too much wasted RAM
4463                    in fact, so 2 is just my guess and still should work. */
4464                 cli->cl_import->imp_rq_pool =
4465                         ptlrpc_init_rq_pool(cli->cl_max_rpcs_in_flight + 2,
4466                                             OST_MAXREQSIZE,
4467                                             ptlrpc_add_rqs_to_pool);
4468
4469                 CFS_INIT_LIST_HEAD(&cli->cl_grant_shrink_list);
4470                 cfs_sema_init(&cli->cl_grant_sem, 1);
4471
4472                 ns_register_cancel(obd->obd_namespace, osc_cancel_for_recovery);
4473         }
4474
4475         RETURN(rc);
4476 }
4477
4478 static int osc_precleanup(struct obd_device *obd, enum obd_cleanup_stage stage)
4479 {
4480         int rc = 0;
4481         ENTRY;
4482
4483         switch (stage) {
4484         case OBD_CLEANUP_EARLY: {
4485                 struct obd_import *imp;
4486                 imp = obd->u.cli.cl_import;
4487                 CDEBUG(D_HA, "Deactivating import %s\n", obd->obd_name);
4488                 /* ptlrpc_abort_inflight to stop an mds_lov_synchronize */
4489                 ptlrpc_deactivate_import(imp);
4490                 cfs_spin_lock(&imp->imp_lock);
4491                 imp->imp_pingable = 0;
4492                 cfs_spin_unlock(&imp->imp_lock);
4493                 break;
4494         }
4495         case OBD_CLEANUP_EXPORTS: {
4496                 /* If we set up but never connected, the
4497                    client import will not have been cleaned. */
4498                 if (obd->u.cli.cl_import) {
4499                         struct obd_import *imp;
4500                         cfs_down_write(&obd->u.cli.cl_sem);
4501                         imp = obd->u.cli.cl_import;
4502                         CDEBUG(D_CONFIG, "%s: client import never connected\n",
4503                                obd->obd_name);
4504                         ptlrpc_invalidate_import(imp);
4505                         if (imp->imp_rq_pool) {
4506                                 ptlrpc_free_rq_pool(imp->imp_rq_pool);
4507                                 imp->imp_rq_pool = NULL;
4508                         }
4509                         class_destroy_import(imp);
4510                         cfs_up_write(&obd->u.cli.cl_sem);
4511                         obd->u.cli.cl_import = NULL;
4512                 }
4513                 rc = obd_llog_finish(obd, 0);
4514                 if (rc != 0)
4515                         CERROR("failed to cleanup llogging subsystems\n");
4516                 break;
4517                 }
4518         }
4519         RETURN(rc);
4520 }
4521
4522 int osc_cleanup(struct obd_device *obd)
4523 {
4524         int rc;
4525
4526         ENTRY;
4527         ptlrpc_lprocfs_unregister_obd(obd);
4528         lprocfs_obd_cleanup(obd);
4529
4530         /* free memory of osc quota cache */
4531         lquota_cleanup(quota_interface, obd);
4532
4533         rc = client_obd_cleanup(obd);
4534
4535         ptlrpcd_decref();
4536         RETURN(rc);
4537 }
4538
4539 int osc_process_config_base(struct obd_device *obd, struct lustre_cfg *lcfg)
4540 {
4541         struct lprocfs_static_vars lvars = { 0 };
4542         int rc = 0;
4543
4544         lprocfs_osc_init_vars(&lvars);
4545
4546         switch (lcfg->lcfg_command) {
4547         default:
4548                 rc = class_process_proc_param(PARAM_OSC, lvars.obd_vars,
4549                                               lcfg, obd);
4550                 if (rc > 0)
4551                         rc = 0;
4552                 break;
4553         }
4554
4555         return(rc);
4556 }
4557
4558 static int osc_process_config(struct obd_device *obd, obd_count len, void *buf)
4559 {
4560         return osc_process_config_base(obd, buf);
4561 }
4562
4563 struct obd_ops osc_obd_ops = {
4564         .o_owner                = THIS_MODULE,
4565         .o_setup                = osc_setup,
4566         .o_precleanup           = osc_precleanup,
4567         .o_cleanup              = osc_cleanup,
4568         .o_add_conn             = client_import_add_conn,
4569         .o_del_conn             = client_import_del_conn,
4570         .o_connect              = client_connect_import,
4571         .o_reconnect            = osc_reconnect,
4572         .o_disconnect           = osc_disconnect,
4573         .o_statfs               = osc_statfs,
4574         .o_statfs_async         = osc_statfs_async,
4575         .o_packmd               = osc_packmd,
4576         .o_unpackmd             = osc_unpackmd,
4577         .o_precreate            = osc_precreate,
4578         .o_create               = osc_create,
4579         .o_create_async         = osc_create_async,
4580         .o_destroy              = osc_destroy,
4581         .o_getattr              = osc_getattr,
4582         .o_getattr_async        = osc_getattr_async,
4583         .o_setattr              = osc_setattr,
4584         .o_setattr_async        = osc_setattr_async,
4585         .o_brw                  = osc_brw,
4586         .o_punch                = osc_punch,
4587         .o_sync                 = osc_sync,
4588         .o_enqueue              = osc_enqueue,
4589         .o_change_cbdata        = osc_change_cbdata,
4590         .o_find_cbdata          = osc_find_cbdata,
4591         .o_cancel               = osc_cancel,
4592         .o_cancel_unused        = osc_cancel_unused,
4593         .o_iocontrol            = osc_iocontrol,
4594         .o_get_info             = osc_get_info,
4595         .o_set_info_async       = osc_set_info_async,
4596         .o_import_event         = osc_import_event,
4597         .o_llog_init            = osc_llog_init,
4598         .o_llog_finish          = osc_llog_finish,
4599         .o_process_config       = osc_process_config,
4600 };
4601
4602 extern struct lu_kmem_descr osc_caches[];
4603 extern cfs_spinlock_t       osc_ast_guard;
4604 extern cfs_lock_class_key_t osc_ast_guard_class;
4605
4606 int __init osc_init(void)
4607 {
4608         struct lprocfs_static_vars lvars = { 0 };
4609         int rc;
4610         ENTRY;
4611
4612         /* print an address of _any_ initialized kernel symbol from this
4613          * module, to allow debugging with gdb that doesn't support data
4614          * symbols from modules.*/
4615         CDEBUG(D_CONSOLE, "Lustre OSC module (%p).\n", &osc_caches);
4616
4617         rc = lu_kmem_init(osc_caches);
4618
4619         lprocfs_osc_init_vars(&lvars);
4620
4621         cfs_request_module("lquota");
4622         quota_interface = PORTAL_SYMBOL_GET(osc_quota_interface);
4623         lquota_init(quota_interface);
4624         init_obd_quota_ops(quota_interface, &osc_obd_ops);
4625
4626         rc = class_register_type(&osc_obd_ops, NULL, lvars.module_vars,
4627                                  LUSTRE_OSC_NAME, &osc_device_type);
4628         if (rc) {
4629                 if (quota_interface)
4630                         PORTAL_SYMBOL_PUT(osc_quota_interface);
4631                 lu_kmem_fini(osc_caches);
4632                 RETURN(rc);
4633         }
4634
4635         cfs_spin_lock_init(&osc_ast_guard);
4636         cfs_lockdep_set_class(&osc_ast_guard, &osc_ast_guard_class);
4637
4638         osc_mds_ost_orig_logops = llog_lvfs_ops;
4639         osc_mds_ost_orig_logops.lop_setup = llog_obd_origin_setup;
4640         osc_mds_ost_orig_logops.lop_cleanup = llog_obd_origin_cleanup;
4641         osc_mds_ost_orig_logops.lop_add = llog_obd_origin_add;
4642         osc_mds_ost_orig_logops.lop_connect = llog_origin_connect;
4643
4644         RETURN(rc);
4645 }
4646
4647 #ifdef __KERNEL__
4648 static void /*__exit*/ osc_exit(void)
4649 {
4650         lu_device_type_fini(&osc_device_type);
4651
4652         lquota_exit(quota_interface);
4653         if (quota_interface)
4654                 PORTAL_SYMBOL_PUT(osc_quota_interface);
4655
4656         class_unregister_type(LUSTRE_OSC_NAME);
4657         lu_kmem_fini(osc_caches);
4658 }
4659
4660 MODULE_AUTHOR("Sun Microsystems, Inc. <http://www.lustre.org/>");
4661 MODULE_DESCRIPTION("Lustre Object Storage Client (OSC)");
4662 MODULE_LICENSE("GPL");
4663
4664 cfs_module(osc, LUSTRE_VERSION_STRING, osc_init, osc_exit);
4665 #endif