Whamcloud - gitweb
b7d986fd261db5a5ac8d0da7428783cae4bfc648
[fs/lustre-release.git] / lustre / osc / osc_request.c
1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2  * vim:expandtab:shiftwidth=8:tabstop=8:
3  *
4  * GPL HEADER START
5  *
6  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
7  *
8  * This program is free software; you can redistribute it and/or modify
9  * it under the terms of the GNU General Public License version 2 only,
10  * as published by the Free Software Foundation.
11  *
12  * This program is distributed in the hope that it will be useful, but
13  * WITHOUT ANY WARRANTY; without even the implied warranty of
14  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
15  * General Public License version 2 for more details (a copy is included
16  * in the LICENSE file that accompanied this code).
17  *
18  * You should have received a copy of the GNU General Public License
19  * version 2 along with this program; If not, see
20  * http://www.sun.com/software/products/lustre/docs/GPLv2.pdf
21  *
22  * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara,
23  * CA 95054 USA or visit www.sun.com if you need additional information or
24  * have any questions.
25  *
26  * GPL HEADER END
27  */
28 /*
29  * Copyright  2008 Sun Microsystems, Inc. All rights reserved
30  * Use is subject to license terms.
31  */
32 /*
33  * This file is part of Lustre, http://www.lustre.org/
34  * Lustre is a trademark of Sun Microsystems, Inc.
35  */
36
37 #ifndef EXPORT_SYMTAB
38 # define EXPORT_SYMTAB
39 #endif
40 #define DEBUG_SUBSYSTEM S_OSC
41
42 #ifdef __KERNEL__
43 # include <libcfs/libcfs.h>
44 #else /* __KERNEL__ */
45 # include <liblustre.h>
46 #endif
47
48 # include <lustre_dlm.h>
49 #include <libcfs/kp30.h>
50 #include <lustre_net.h>
51 #include <lustre/lustre_user.h>
52 #include <obd_ost.h>
53 #include <obd_lov.h>
54
55 #ifdef  __CYGWIN__
56 # include <ctype.h>
57 #endif
58
59 #include <lustre_ha.h>
60 #include <lprocfs_status.h>
61 #include <lustre_log.h>
62 #include <lustre_debug.h>
63 #include <lustre_param.h>
64 #include <lustre_cache.h>
65 #include "osc_internal.h"
66
67 static quota_interface_t *quota_interface = NULL;
68 extern quota_interface_t osc_quota_interface;
69
70 static void osc_release_ppga(struct brw_page **ppga, obd_count count);
71 static int brw_interpret(struct ptlrpc_request *request, void *data, int rc);
72 int osc_cleanup(struct obd_device *obd);
73
74 static quota_interface_t *quota_interface;
75 extern quota_interface_t osc_quota_interface;
76
77 /* by default 10s */
78 atomic_t osc_resend_time;
79
80 /* Pack OSC object metadata for disk storage (LE byte order). */
81 static int osc_packmd(struct obd_export *exp, struct lov_mds_md **lmmp,
82                       struct lov_stripe_md *lsm)
83 {
84         int lmm_size;
85         ENTRY;
86
87         lmm_size = sizeof(**lmmp);
88         if (!lmmp)
89                 RETURN(lmm_size);
90
91         if (*lmmp && !lsm) {
92                 OBD_FREE(*lmmp, lmm_size);
93                 *lmmp = NULL;
94                 RETURN(0);
95         }
96
97         if (!*lmmp) {
98                 OBD_ALLOC(*lmmp, lmm_size);
99                 if (!*lmmp)
100                         RETURN(-ENOMEM);
101         }
102
103         if (lsm) {
104                 LASSERT(lsm->lsm_object_id);
105                 (*lmmp)->lmm_object_id = cpu_to_le64(lsm->lsm_object_id);
106         }
107
108         RETURN(lmm_size);
109 }
110
111 /* Unpack OSC object metadata from disk storage (LE byte order). */
112 static int osc_unpackmd(struct obd_export *exp, struct lov_stripe_md **lsmp,
113                         struct lov_mds_md *lmm, int lmm_bytes)
114 {
115         int lsm_size;
116         ENTRY;
117
118         if (lmm != NULL) {
119                 if (lmm_bytes < sizeof (*lmm)) {
120                         CERROR("lov_mds_md too small: %d, need %d\n",
121                                lmm_bytes, (int)sizeof(*lmm));
122                         RETURN(-EINVAL);
123                 }
124                 /* XXX LOV_MAGIC etc check? */
125
126                 if (lmm->lmm_object_id == 0) {
127                         CERROR("lov_mds_md: zero lmm_object_id\n");
128                         RETURN(-EINVAL);
129                 }
130         }
131
132         lsm_size = lov_stripe_md_size(1);
133         if (lsmp == NULL)
134                 RETURN(lsm_size);
135
136         if (*lsmp != NULL && lmm == NULL) {
137                 OBD_FREE((*lsmp)->lsm_oinfo[0], sizeof(struct lov_oinfo));
138                 OBD_FREE(*lsmp, lsm_size);
139                 *lsmp = NULL;
140                 RETURN(0);
141         }
142
143         if (*lsmp == NULL) {
144                 OBD_ALLOC(*lsmp, lsm_size);
145                 if (*lsmp == NULL)
146                         RETURN(-ENOMEM);
147                 OBD_ALLOC((*lsmp)->lsm_oinfo[0], sizeof(struct lov_oinfo));
148                 if ((*lsmp)->lsm_oinfo[0] == NULL) {
149                         OBD_FREE(*lsmp, lsm_size);
150                         RETURN(-ENOMEM);
151                 }
152                 loi_init((*lsmp)->lsm_oinfo[0]);
153         }
154
155         if (lmm != NULL) {
156                 /* XXX zero *lsmp? */
157                 (*lsmp)->lsm_object_id = le64_to_cpu (lmm->lmm_object_id);
158                 LASSERT((*lsmp)->lsm_object_id);
159         }
160
161         (*lsmp)->lsm_maxbytes = LUSTRE_STRIPE_MAXBYTES;
162
163         RETURN(lsm_size);
164 }
165
166 static int osc_getattr_interpret(struct ptlrpc_request *req,
167                                  struct osc_async_args *aa, int rc)
168 {
169         struct ost_body *body;
170         ENTRY;
171
172         if (rc != 0)
173                 GOTO(out, rc);
174
175         body = lustre_swab_repbuf(req, REPLY_REC_OFF, sizeof(*body),
176                                   lustre_swab_ost_body);
177         if (body) {
178                 CDEBUG(D_INODE, "mode: %o\n", body->oa.o_mode);
179                 memcpy(aa->aa_oi->oi_oa, &body->oa, sizeof(*aa->aa_oi->oi_oa));
180
181                 /* This should really be sent by the OST */
182                 aa->aa_oi->oi_oa->o_blksize = PTLRPC_MAX_BRW_SIZE;
183                 aa->aa_oi->oi_oa->o_valid |= OBD_MD_FLBLKSZ;
184         } else {
185                 CERROR("can't unpack ost_body\n");
186                 rc = -EPROTO;
187                 aa->aa_oi->oi_oa->o_valid = 0;
188         }
189 out:
190         rc = aa->aa_oi->oi_cb_up(aa->aa_oi, rc);
191         RETURN(rc);
192 }
193
194 static int osc_getattr_async(struct obd_export *exp, struct obd_info *oinfo,
195                              struct ptlrpc_request_set *set)
196 {
197         struct ptlrpc_request *req;
198         struct ost_body *body;
199         __u32 size[2] = { sizeof(struct ptlrpc_body), sizeof(*body) };
200         struct osc_async_args *aa;
201         ENTRY;
202
203         req = ptlrpc_prep_req(class_exp2cliimp(exp), LUSTRE_OST_VERSION,
204                               OST_GETATTR, 2, size,NULL);
205         if (!req)
206                 RETURN(-ENOMEM);
207
208         body = lustre_msg_buf(req->rq_reqmsg, REQ_REC_OFF, sizeof(*body));
209         memcpy(&body->oa, oinfo->oi_oa, sizeof(*oinfo->oi_oa));
210
211         ptlrpc_req_set_repsize(req, 2, size);
212         req->rq_interpret_reply = osc_getattr_interpret;
213
214         CLASSERT(sizeof(*aa) <= sizeof(req->rq_async_args));
215         aa = ptlrpc_req_async_args(req);
216         aa->aa_oi = oinfo;
217
218         ptlrpc_set_add_req(set, req);
219         RETURN (0);
220 }
221
222 static int osc_getattr(struct obd_export *exp, struct obd_info *oinfo)
223 {
224         struct ptlrpc_request *req;
225         struct ost_body *body;
226         __u32 size[2] = { sizeof(struct ptlrpc_body), sizeof(*body) };
227         int rc;
228         ENTRY;
229
230         req = ptlrpc_prep_req(class_exp2cliimp(exp), LUSTRE_OST_VERSION,
231                               OST_GETATTR, 2, size, NULL);
232         if (!req)
233                 RETURN(-ENOMEM);
234
235         body = lustre_msg_buf(req->rq_reqmsg, REQ_REC_OFF, sizeof(*body));
236         memcpy(&body->oa, oinfo->oi_oa, sizeof(*oinfo->oi_oa));
237
238         ptlrpc_req_set_repsize(req, 2, size);
239
240         rc = ptlrpc_queue_wait(req);
241         if (rc) {
242                 CERROR("%s failed: rc = %d\n", __FUNCTION__, rc);
243                 GOTO(out, rc);
244         }
245
246         body = lustre_swab_repbuf(req, REPLY_REC_OFF, sizeof(*body),
247                                   lustre_swab_ost_body);
248         if (body == NULL) {
249                 CERROR ("can't unpack ost_body\n");
250                 GOTO (out, rc = -EPROTO);
251         }
252
253         CDEBUG(D_INODE, "mode: %o\n", body->oa.o_mode);
254         memcpy(oinfo->oi_oa, &body->oa, sizeof(*oinfo->oi_oa));
255
256         /* This should really be sent by the OST */
257         oinfo->oi_oa->o_blksize = PTLRPC_MAX_BRW_SIZE;
258         oinfo->oi_oa->o_valid |= OBD_MD_FLBLKSZ;
259
260         EXIT;
261  out:
262         ptlrpc_req_finished(req);
263         return rc;
264 }
265
266 static int osc_setattr(struct obd_export *exp, struct obd_info *oinfo,
267                        struct obd_trans_info *oti)
268 {
269         struct ptlrpc_request *req;
270         struct ost_body *body;
271         __u32 size[2] = { sizeof(struct ptlrpc_body), sizeof(*body) };
272         int rc;
273         ENTRY;
274
275         req = ptlrpc_prep_req(class_exp2cliimp(exp), LUSTRE_OST_VERSION,
276                               OST_SETATTR, 2, size, NULL);
277         if (!req)
278                 RETURN(-ENOMEM);
279
280         body = lustre_msg_buf(req->rq_reqmsg, REQ_REC_OFF, sizeof(*body));
281         memcpy(&body->oa, oinfo->oi_oa, sizeof(*oinfo->oi_oa));
282
283         ptlrpc_req_set_repsize(req, 2, size);
284
285         rc = ptlrpc_queue_wait(req);
286         if (rc)
287                 GOTO(out, rc);
288
289         body = lustre_swab_repbuf(req, REPLY_REC_OFF, sizeof(*body),
290                                   lustre_swab_ost_body);
291         if (body == NULL)
292                 GOTO(out, rc = -EPROTO);
293
294         memcpy(oinfo->oi_oa, &body->oa, sizeof(*oinfo->oi_oa));
295
296         EXIT;
297 out:
298         ptlrpc_req_finished(req);
299         RETURN(rc);
300 }
301
302 static int osc_setattr_interpret(struct ptlrpc_request *req,
303                                  struct osc_async_args *aa, int rc)
304 {
305         struct ost_body *body;
306         ENTRY;
307
308         if (rc != 0)
309                 GOTO(out, rc);
310
311         body = lustre_swab_repbuf(req, REPLY_REC_OFF, sizeof(*body),
312                                   lustre_swab_ost_body);
313         if (body == NULL) {
314                 CERROR("can't unpack ost_body\n");
315                 GOTO(out, rc = -EPROTO);
316         }
317
318         memcpy(aa->aa_oi->oi_oa, &body->oa, sizeof(*aa->aa_oi->oi_oa));
319 out:
320         rc = aa->aa_oi->oi_cb_up(aa->aa_oi, rc);
321         RETURN(rc);
322 }
323
324 static int osc_setattr_async(struct obd_export *exp, struct obd_info *oinfo,
325                              struct obd_trans_info *oti,
326                              struct ptlrpc_request_set *rqset)
327 {
328         struct ptlrpc_request *req;
329         struct ost_body *body;
330         __u32 size[3] = { sizeof(struct ptlrpc_body), sizeof(*body), 0 };
331         int bufcount = 2;
332         struct osc_async_args *aa;
333         ENTRY;
334
335         if (osc_exp_is_2_0_server(exp)) {
336                 bufcount = 3;
337         }
338
339         req = ptlrpc_prep_req(class_exp2cliimp(exp), LUSTRE_OST_VERSION,
340                               OST_SETATTR, bufcount, size, NULL);
341         if (!req)
342                 RETURN(-ENOMEM);
343
344         body = lustre_msg_buf(req->rq_reqmsg, REQ_REC_OFF, sizeof(*body));
345
346         if (oinfo->oi_oa->o_valid & OBD_MD_FLCOOKIE) {
347                 LASSERT(oti);
348                 oinfo->oi_oa->o_lcookie = *oti->oti_logcookies;
349         }
350
351         memcpy(&body->oa, oinfo->oi_oa, sizeof(*oinfo->oi_oa));
352         ptlrpc_req_set_repsize(req, 2, size);
353         /* do mds to ost setattr asynchronouly */
354         if (!rqset) {
355                 /* Do not wait for response. */
356                 ptlrpcd_add_req(req);
357         } else {
358                 req->rq_interpret_reply = osc_setattr_interpret;
359
360                 CLASSERT(sizeof(*aa) <= sizeof(req->rq_async_args));
361                 aa = ptlrpc_req_async_args(req);
362                 aa->aa_oi = oinfo;
363
364                 ptlrpc_set_add_req(rqset, req);
365         }
366
367         RETURN(0);
368 }
369
370 int osc_real_create(struct obd_export *exp, struct obdo *oa,
371                     struct lov_stripe_md **ea, struct obd_trans_info *oti)
372 {
373         struct ptlrpc_request *req;
374         struct ost_body *body;
375         struct lov_stripe_md *lsm;
376         __u32 size[2] = { sizeof(struct ptlrpc_body), sizeof(*body) };
377         int rc;
378         ENTRY;
379
380         LASSERT(oa);
381         LASSERT(ea);
382
383         lsm = *ea;
384         if (!lsm) {
385                 rc = obd_alloc_memmd(exp, &lsm);
386                 if (rc < 0)
387                         RETURN(rc);
388         }
389
390         req = ptlrpc_prep_req(class_exp2cliimp(exp), LUSTRE_OST_VERSION,
391                               OST_CREATE, 2, size, NULL);
392         if (!req)
393                 GOTO(out, rc = -ENOMEM);
394
395         body = lustre_msg_buf(req->rq_reqmsg, REQ_REC_OFF, sizeof(*body));
396         memcpy(&body->oa, oa, sizeof(body->oa));
397
398         ptlrpc_req_set_repsize(req, 2, size);
399         if ((oa->o_valid & OBD_MD_FLFLAGS) &&
400             oa->o_flags == OBD_FL_DELORPHAN) {
401                 DEBUG_REQ(D_HA, req,
402                           "delorphan from OST integration");
403                 /* Don't resend the delorphan req */
404                 req->rq_no_resend = req->rq_no_delay = 1;
405         }
406
407         rc = ptlrpc_queue_wait(req);
408         if (rc)
409                 GOTO(out_req, rc);
410
411         body = lustre_swab_repbuf(req, REPLY_REC_OFF, sizeof(*body),
412                                   lustre_swab_ost_body);
413         if (body == NULL) {
414                 CERROR ("can't unpack ost_body\n");
415                 GOTO (out_req, rc = -EPROTO);
416         }
417
418         memcpy(oa, &body->oa, sizeof(*oa));
419
420         /* This should really be sent by the OST */
421         oa->o_blksize = PTLRPC_MAX_BRW_SIZE;
422         oa->o_valid |= OBD_MD_FLBLKSZ;
423
424         /* XXX LOV STACKING: the lsm that is passed to us from LOV does not
425          * have valid lsm_oinfo data structs, so don't go touching that.
426          * This needs to be fixed in a big way.
427          */
428         lsm->lsm_object_id = oa->o_id;
429         *ea = lsm;
430
431         if (oti != NULL) {
432                 oti->oti_transno = lustre_msg_get_transno(req->rq_repmsg);
433
434                 if (oa->o_valid & OBD_MD_FLCOOKIE) {
435                         if (!oti->oti_logcookies)
436                                 oti_alloc_cookies(oti, 1);
437                         *oti->oti_logcookies = oa->o_lcookie;
438                 }
439         }
440
441         CDEBUG(D_HA, "transno: "LPD64"\n",
442                lustre_msg_get_transno(req->rq_repmsg));
443 out_req:
444         ptlrpc_req_finished(req);
445 out:
446         if (rc && !*ea)
447                 obd_free_memmd(exp, &lsm);
448         RETURN(rc);
449 }
450
451 static int osc_punch_interpret(struct ptlrpc_request *req,
452                                struct osc_async_args *aa, int rc)
453 {
454         struct ost_body *body;
455         ENTRY;
456
457         if (rc != 0)
458                 GOTO(out, rc);
459
460         body = lustre_swab_repbuf(req, REPLY_REC_OFF, sizeof (*body),
461                                   lustre_swab_ost_body);
462         if (body == NULL) {
463                 CERROR ("can't unpack ost_body\n");
464                 GOTO(out, rc = -EPROTO);
465         }
466
467         memcpy(aa->aa_oi->oi_oa, &body->oa, sizeof(*aa->aa_oi->oi_oa));
468 out:
469         rc = aa->aa_oi->oi_cb_up(aa->aa_oi, rc);
470         RETURN(rc);
471 }
472
473 static int osc_punch(struct obd_export *exp, struct obd_info *oinfo,
474                      struct obd_trans_info *oti,
475                      struct ptlrpc_request_set *rqset)
476 {
477         struct ptlrpc_request *req;
478         struct osc_async_args *aa;
479         struct ost_body *body;
480         __u32 size[2] = { sizeof(struct ptlrpc_body), sizeof(*body) };
481         ENTRY;
482
483         if (!oinfo->oi_oa) {
484                 CERROR("oa NULL\n");
485                 RETURN(-EINVAL);
486         }
487
488         req = ptlrpc_prep_req(class_exp2cliimp(exp), LUSTRE_OST_VERSION,
489                               OST_PUNCH, 2, size, NULL);
490         if (!req)
491                 RETURN(-ENOMEM);
492
493         req->rq_request_portal = OST_IO_PORTAL;         /* bug 7198 */
494         ptlrpc_at_set_req_timeout(req);
495
496         body = lustre_msg_buf(req->rq_reqmsg, REQ_REC_OFF, sizeof(*body));
497         memcpy(&body->oa, oinfo->oi_oa, sizeof(*oinfo->oi_oa));
498
499         /* overload the size and blocks fields in the oa with start/end */
500         body->oa.o_size = oinfo->oi_policy.l_extent.start;
501         body->oa.o_blocks = oinfo->oi_policy.l_extent.end;
502         body->oa.o_valid |= (OBD_MD_FLSIZE | OBD_MD_FLBLOCKS);
503
504         ptlrpc_req_set_repsize(req, 2, size);
505
506         req->rq_interpret_reply = osc_punch_interpret;
507         CLASSERT(sizeof(*aa) <= sizeof(req->rq_async_args));
508         aa = ptlrpc_req_async_args(req);
509         aa->aa_oi = oinfo;
510         ptlrpc_set_add_req(rqset, req);
511
512         RETURN(0);
513 }
514
515 static int osc_sync_interpret(struct ptlrpc_request *req,
516                               struct osc_async_args *aa, int rc)
517 {
518         struct ost_body *body;
519         ENTRY;
520
521         if (rc)
522                 GOTO(out, rc);
523
524         body = lustre_swab_repbuf(req, REPLY_REC_OFF, sizeof(*body),
525                                   lustre_swab_ost_body);
526         if (body == NULL) {
527                 CERROR ("can't unpack ost_body\n");
528                 GOTO(out, rc = -EPROTO);
529         }
530
531         *aa->aa_oi->oi_oa = body->oa;
532 out:
533         rc = aa->aa_oi->oi_cb_up(aa->aa_oi, rc);
534         RETURN(rc);
535 }
536
537 static int osc_sync(struct obd_export *exp, struct obd_info *oinfo,
538                     obd_size start, obd_size end,
539                     struct ptlrpc_request_set *set)
540 {
541         struct ptlrpc_request *req;
542         struct ost_body *body;
543         __u32 size[2] = { sizeof(struct ptlrpc_body), sizeof(*body) };
544         struct osc_async_args *aa;
545         ENTRY;
546
547         if (!oinfo->oi_oa) {
548                 CERROR("oa NULL\n");
549                 RETURN(-EINVAL);
550         }
551
552         req = ptlrpc_prep_req(class_exp2cliimp(exp), LUSTRE_OST_VERSION,
553                               OST_SYNC, 2, size, NULL);
554         if (!req)
555                 RETURN(-ENOMEM);
556
557         body = lustre_msg_buf(req->rq_reqmsg, REQ_REC_OFF, sizeof(*body));
558         memcpy(&body->oa, oinfo->oi_oa, sizeof(*oinfo->oi_oa));
559
560         /* overload the size and blocks fields in the oa with start/end */
561         body->oa.o_size = start;
562         body->oa.o_blocks = end;
563         body->oa.o_valid |= (OBD_MD_FLSIZE | OBD_MD_FLBLOCKS);
564
565         ptlrpc_req_set_repsize(req, 2, size);
566         req->rq_interpret_reply = osc_sync_interpret;
567
568         CLASSERT(sizeof(*aa) <= sizeof(req->rq_async_args));
569         aa = ptlrpc_req_async_args(req);
570         aa->aa_oi = oinfo;
571
572         ptlrpc_set_add_req(set, req);
573         RETURN (0);
574 }
575
576 /* Find and cancel locally locks matched by @mode in the resource found by
577  * @objid. Found locks are added into @cancel list. Returns the amount of
578  * locks added to @cancels list. */
579 static int osc_resource_get_unused(struct obd_export *exp, struct obdo *oa,
580                                    struct list_head *cancels, ldlm_mode_t mode,
581                                    int lock_flags)
582 {
583         struct ldlm_namespace *ns = exp->exp_obd->obd_namespace;
584         struct ldlm_res_id res_id;
585         struct ldlm_resource *res;
586         int count;
587         ENTRY;
588
589         osc_build_res_name(oa->o_id, oa->o_gr, &res_id);
590         res = ldlm_resource_get(ns, NULL, res_id, 0, 0);
591         if (res == NULL)
592                 RETURN(0);
593
594         count = ldlm_cancel_resource_local(res, cancels, NULL, mode,
595                                            lock_flags, 0, NULL);
596         ldlm_resource_putref(res);
597         RETURN(count);
598 }
599
600 static int osc_destroy_interpret(struct ptlrpc_request *req, void *data,
601                                  int rc)
602 {
603         struct client_obd *cli = &req->rq_import->imp_obd->u.cli;
604
605         atomic_dec(&cli->cl_destroy_in_flight);
606         cfs_waitq_signal(&cli->cl_destroy_waitq);
607         return 0;
608 }
609
610 static int osc_can_send_destroy(struct client_obd *cli)
611 {
612         if (atomic_inc_return(&cli->cl_destroy_in_flight) <=
613             cli->cl_max_rpcs_in_flight) {
614                 /* The destroy request can be sent */
615                 return 1;
616         }
617         if (atomic_dec_return(&cli->cl_destroy_in_flight) <
618             cli->cl_max_rpcs_in_flight) {
619                 /*
620                  * The counter has been modified between the two atomic
621                  * operations.
622                  */
623                 cfs_waitq_signal(&cli->cl_destroy_waitq);
624         }
625         return 0;
626 }
627
628 /* Destroy requests can be async always on the client, and we don't even really
629  * care about the return code since the client cannot do anything at all about
630  * a destroy failure.
631  * When the MDS is unlinking a filename, it saves the file objects into a
632  * recovery llog, and these object records are cancelled when the OST reports
633  * they were destroyed and sync'd to disk (i.e. transaction committed).
634  * If the client dies, or the OST is down when the object should be destroyed,
635  * the records are not cancelled, and when the OST reconnects to the MDS next,
636  * it will retrieve the llog unlink logs and then sends the log cancellation
637  * cookies to the MDS after committing destroy transactions. */
638 static int osc_destroy(struct obd_export *exp, struct obdo *oa,
639                        struct lov_stripe_md *ea, struct obd_trans_info *oti,
640                        struct obd_export *md_export)
641 {
642         CFS_LIST_HEAD(cancels);
643         struct ptlrpc_request *req;
644         struct ost_body *body;
645         __u32 size[3] = { sizeof(struct ptlrpc_body), sizeof(*body),
646                         sizeof(struct ldlm_request) };
647         int count, bufcount = 2;
648         struct client_obd *cli = &exp->exp_obd->u.cli;
649         ENTRY;
650
651         if (!oa) {
652                 CERROR("oa NULL\n");
653                 RETURN(-EINVAL);
654         }
655
656         LASSERT(oa->o_id != 0);
657
658         count = osc_resource_get_unused(exp, oa, &cancels, LCK_PW,
659                                         LDLM_FL_DISCARD_DATA);
660         if (exp_connect_cancelset(exp))
661                 bufcount = 3;
662         req = ldlm_prep_elc_req(exp, LUSTRE_OST_VERSION, OST_DESTROY, bufcount,
663                                 size, REQ_REC_OFF + 1, 0, &cancels, count);
664         if (!req)
665                 RETURN(-ENOMEM);
666
667         req->rq_request_portal = OST_IO_PORTAL;         /* bug 7198 */
668         ptlrpc_at_set_req_timeout(req);
669
670         body = lustre_msg_buf(req->rq_reqmsg, REQ_REC_OFF, sizeof(*body));
671
672         if (oti != NULL && oa->o_valid & OBD_MD_FLCOOKIE) {
673                 oa->o_lcookie = *oti->oti_logcookies;
674         }
675
676         memcpy(&body->oa, oa, sizeof(*oa));
677         ptlrpc_req_set_repsize(req, 2, size);
678
679         /* don't throttle destroy RPCs for the MDT */
680         if (!(cli->cl_import->imp_connect_flags_orig & OBD_CONNECT_MDS)) {
681                 req->rq_interpret_reply = osc_destroy_interpret;
682                 if (!osc_can_send_destroy(cli)) {
683                         struct l_wait_info lwi = { 0 };
684
685                         /*
686                          * Wait until the number of on-going destroy RPCs drops
687                          * under max_rpc_in_flight
688                          */
689                         l_wait_event_exclusive(cli->cl_destroy_waitq,
690                                                osc_can_send_destroy(cli), &lwi);
691                 }
692         }
693
694         /* Do not wait for response */
695         ptlrpcd_add_req(req);
696         RETURN(0);
697 }
698
699 static void osc_announce_cached(struct client_obd *cli, struct obdo *oa,
700                                 long writing_bytes)
701 {
702         obd_flag bits = OBD_MD_FLBLOCKS|OBD_MD_FLGRANT;
703
704         LASSERT(!(oa->o_valid & bits));
705
706         oa->o_valid |= bits;
707         client_obd_list_lock(&cli->cl_loi_list_lock);
708         oa->o_dirty = cli->cl_dirty;
709         if (cli->cl_dirty > cli->cl_dirty_max) {
710                 CERROR("dirty %lu > dirty_max %lu\n",
711                        cli->cl_dirty, cli->cl_dirty_max);
712                 oa->o_undirty = 0;
713         } else if (atomic_read(&obd_dirty_pages) > obd_max_dirty_pages) {
714                 CERROR("dirty %d > system dirty_max %d\n",
715                        atomic_read(&obd_dirty_pages), obd_max_dirty_pages);
716                 oa->o_undirty = 0;
717         } else if (cli->cl_dirty_max - cli->cl_dirty > 0x7fffffff) {
718                 CERROR("dirty %lu - dirty_max %lu too big???\n",
719                        cli->cl_dirty, cli->cl_dirty_max);
720                 oa->o_undirty = 0;
721         } else {
722                 long max_in_flight = (cli->cl_max_pages_per_rpc << CFS_PAGE_SHIFT)*
723                                 (cli->cl_max_rpcs_in_flight + 1);
724                 oa->o_undirty = max(cli->cl_dirty_max, max_in_flight);
725         }
726         oa->o_grant = cli->cl_avail_grant;
727         oa->o_dropped = cli->cl_lost_grant;
728         cli->cl_lost_grant = 0;
729         client_obd_list_unlock(&cli->cl_loi_list_lock);
730         CDEBUG(D_CACHE,"dirty: "LPU64" undirty: %u dropped %u grant: "LPU64"\n",
731                oa->o_dirty, oa->o_undirty, oa->o_dropped, oa->o_grant);
732 }
733
734 /* caller must hold loi_list_lock */
735 static void osc_consume_write_grant(struct client_obd *cli,struct brw_page *pga)
736 {
737         atomic_inc(&obd_dirty_pages);
738         cli->cl_dirty += CFS_PAGE_SIZE;
739         cli->cl_avail_grant -= CFS_PAGE_SIZE;
740         pga->flag |= OBD_BRW_FROM_GRANT;
741         CDEBUG(D_CACHE, "using %lu grant credits for brw %p page %p\n",
742                CFS_PAGE_SIZE, pga, pga->pg);
743         LASSERTF(cli->cl_avail_grant >= 0, "invalid avail grant is %ld \n",
744                  cli->cl_avail_grant);
745 }
746
747 /* the companion to osc_consume_write_grant, called when a brw has completed.
748  * must be called with the loi lock held. */
749 static void osc_release_write_grant(struct client_obd *cli,
750                                     struct brw_page *pga, int sent)
751 {
752         int blocksize = cli->cl_import->imp_obd->obd_osfs.os_bsize ? : 4096;
753         ENTRY;
754
755         if (!(pga->flag & OBD_BRW_FROM_GRANT)) {
756                 EXIT;
757                 return;
758         }
759
760         pga->flag &= ~OBD_BRW_FROM_GRANT;
761         atomic_dec(&obd_dirty_pages);
762         cli->cl_dirty -= CFS_PAGE_SIZE;
763         if (!sent) {
764                 cli->cl_lost_grant += CFS_PAGE_SIZE;
765                 CDEBUG(D_CACHE, "lost grant: %lu avail grant: %lu dirty: %lu\n",
766                        cli->cl_lost_grant, cli->cl_avail_grant, cli->cl_dirty);
767         } else if (CFS_PAGE_SIZE != blocksize && pga->count != CFS_PAGE_SIZE) {
768                 /* For short writes we shouldn't count parts of pages that
769                  * span a whole block on the OST side, or our accounting goes
770                  * wrong.  Should match the code in filter_grant_check. */
771                 int offset = pga->off & ~CFS_PAGE_MASK;
772                 int count = pga->count + (offset & (blocksize - 1));
773                 int end = (offset + pga->count) & (blocksize - 1);
774                 if (end)
775                         count += blocksize - end;
776
777                 cli->cl_lost_grant += CFS_PAGE_SIZE - count;
778                 CDEBUG(D_CACHE, "lost %lu grant: %lu avail: %lu dirty: %lu\n",
779                        CFS_PAGE_SIZE - count, cli->cl_lost_grant,
780                        cli->cl_avail_grant, cli->cl_dirty);
781         }
782
783         EXIT;
784 }
785
786 static unsigned long rpcs_in_flight(struct client_obd *cli)
787 {
788         return cli->cl_r_in_flight + cli->cl_w_in_flight;
789 }
790
791 /* caller must hold loi_list_lock */
792 void osc_wake_cache_waiters(struct client_obd *cli)
793 {
794         struct list_head *l, *tmp;
795         struct osc_cache_waiter *ocw;
796
797         ENTRY;
798         list_for_each_safe(l, tmp, &cli->cl_cache_waiters) {
799                 /* if we can't dirty more, we must wait until some is written */
800                 if ((cli->cl_dirty + CFS_PAGE_SIZE > cli->cl_dirty_max) ||
801                    ((atomic_read(&obd_dirty_pages)+1)>(obd_max_dirty_pages))) {
802                         CDEBUG(D_CACHE, "no dirty room: dirty: %ld "
803                                "osc max %ld, sys max %d\n", cli->cl_dirty,
804                                cli->cl_dirty_max, obd_max_dirty_pages);
805                         return;
806                 }
807
808                 /* if still dirty cache but no grant wait for pending RPCs that
809                  * may yet return us some grant before doing sync writes */
810                 if (cli->cl_w_in_flight && cli->cl_avail_grant < CFS_PAGE_SIZE) {
811                         CDEBUG(D_CACHE, "%u BRW writes in flight, no grant\n",
812                                cli->cl_w_in_flight);
813                         return;
814                 }
815
816                 ocw = list_entry(l, struct osc_cache_waiter, ocw_entry);
817                 list_del_init(&ocw->ocw_entry);
818                 if (cli->cl_avail_grant < CFS_PAGE_SIZE) {
819                         /* no more RPCs in flight to return grant, do sync IO */
820                         ocw->ocw_rc = -EDQUOT;
821                         CDEBUG(D_INODE, "wake oap %p for sync\n", ocw->ocw_oap);
822                 } else {
823                         osc_consume_write_grant(cli,
824                                                 &ocw->ocw_oap->oap_brw_page);
825                 }
826
827                 cfs_waitq_signal(&ocw->ocw_waitq);
828         }
829
830         EXIT;
831 }
832
833 static void osc_init_grant(struct client_obd *cli, struct obd_connect_data *ocd)
834 {
835         client_obd_list_lock(&cli->cl_loi_list_lock);
836         cli->cl_avail_grant = ocd->ocd_grant;
837         client_obd_list_unlock(&cli->cl_loi_list_lock);
838
839         CDEBUG(D_CACHE, "setting cl_avail_grant: %ld cl_lost_grant: %ld\n",
840                cli->cl_avail_grant, cli->cl_lost_grant);
841         LASSERT(cli->cl_avail_grant >= 0);
842 }
843
844 static void osc_update_grant(struct client_obd *cli, struct ost_body *body)
845 {
846         client_obd_list_lock(&cli->cl_loi_list_lock);
847         CDEBUG(D_CACHE, "got "LPU64" extra grant\n", body->oa.o_grant);
848         if (body->oa.o_valid & OBD_MD_FLGRANT)
849                 cli->cl_avail_grant += body->oa.o_grant;
850         /* waiters are woken in brw_interpret */
851         client_obd_list_unlock(&cli->cl_loi_list_lock);
852 }
853
854 /* We assume that the reason this OSC got a short read is because it read
855  * beyond the end of a stripe file; i.e. lustre is reading a sparse file
856  * via the LOV, and it _knows_ it's reading inside the file, it's just that
857  * this stripe never got written at or beyond this stripe offset yet. */
858 static void handle_short_read(int nob_read, obd_count page_count,
859                               struct brw_page **pga)
860 {
861         char *ptr;
862         int i = 0;
863
864         /* skip bytes read OK */
865         while (nob_read > 0) {
866                 LASSERT (page_count > 0);
867
868                 if (pga[i]->count > nob_read) {
869                         /* EOF inside this page */
870                         ptr = cfs_kmap(pga[i]->pg) +
871                                 (pga[i]->off & ~CFS_PAGE_MASK);
872                         memset(ptr + nob_read, 0, pga[i]->count - nob_read);
873                         cfs_kunmap(pga[i]->pg);
874                         page_count--;
875                         i++;
876                         break;
877                 }
878
879                 nob_read -= pga[i]->count;
880                 page_count--;
881                 i++;
882         }
883
884         /* zero remaining pages */
885         while (page_count-- > 0) {
886                 ptr = cfs_kmap(pga[i]->pg) + (pga[i]->off & ~CFS_PAGE_MASK);
887                 memset(ptr, 0, pga[i]->count);
888                 cfs_kunmap(pga[i]->pg);
889                 i++;
890         }
891 }
892
893 static int check_write_rcs(struct ptlrpc_request *req,
894                            int requested_nob, int niocount,
895                            obd_count page_count, struct brw_page **pga)
896 {
897         int    *remote_rcs, i;
898
899         /* return error if any niobuf was in error */
900         remote_rcs = lustre_swab_repbuf(req, REQ_REC_OFF + 1,
901                                         sizeof(*remote_rcs) * niocount, NULL);
902         if (remote_rcs == NULL) {
903                 CERROR("Missing/short RC vector on BRW_WRITE reply\n");
904                 return(-EPROTO);
905         }
906         if (lustre_rep_need_swab(req))
907                 for (i = 0; i < niocount; i++)
908                         __swab32s(&remote_rcs[i]);
909
910         for (i = 0; i < niocount; i++) {
911                 if (remote_rcs[i] < 0)
912                         return(remote_rcs[i]);
913
914                 if (remote_rcs[i] != 0) {
915                         CERROR("rc[%d] invalid (%d) req %p\n",
916                                 i, remote_rcs[i], req);
917                         return(-EPROTO);
918                 }
919         }
920
921         if (req->rq_bulk->bd_nob_transferred != requested_nob) {
922                 CERROR("Unexpected # bytes transferred: %d (requested %d)\n",
923                        req->rq_bulk->bd_nob_transferred, requested_nob);
924                 return(-EPROTO);
925         }
926
927         return (0);
928 }
929
930 static inline int can_merge_pages(struct brw_page *p1, struct brw_page *p2)
931 {
932         if (p1->flag != p2->flag) {
933                 unsigned mask = ~OBD_BRW_FROM_GRANT;
934
935                 /* warn if we try to combine flags that we don't know to be
936                  * safe to combine */
937                 if ((p1->flag & mask) != (p2->flag & mask))
938                         CERROR("is it ok to have flags 0x%x and 0x%x in the "
939                                "same brw?\n", p1->flag, p2->flag);
940                 return 0;
941         }
942
943         return (p1->off + p1->count == p2->off);
944 }
945
946 static obd_count osc_checksum_bulk(int nob, obd_count pg_count,
947                                    struct brw_page **pga, int opc,
948                                    cksum_type_t cksum_type)
949 {
950         __u32 cksum;
951         int i = 0;
952
953         LASSERT (pg_count > 0);
954         cksum = init_checksum(cksum_type);
955         while (nob > 0 && pg_count > 0) {
956                 unsigned char *ptr = cfs_kmap(pga[i]->pg);
957                 int off = pga[i]->off & ~CFS_PAGE_MASK;
958                 int count = pga[i]->count > nob ? nob : pga[i]->count;
959
960                 /* corrupt the data before we compute the checksum, to
961                  * simulate an OST->client data error */
962                 if (i == 0 && opc == OST_READ &&
963                     OBD_FAIL_CHECK_ONCE(OBD_FAIL_OSC_CHECKSUM_RECEIVE))
964                         memcpy(ptr + off, "bad1", min(4, nob));
965                 cksum = compute_checksum(cksum, ptr + off, count, cksum_type);
966                 cfs_kunmap(pga[i]->pg);
967                 LL_CDEBUG_PAGE(D_PAGE, pga[i]->pg, "off %d checksum %x\n",
968                                off, cksum);
969
970                 nob -= pga[i]->count;
971                 pg_count--;
972                 i++;
973         }
974         /* For sending we only compute the wrong checksum instead
975          * of corrupting the data so it is still correct on a redo */
976         if (opc == OST_WRITE && OBD_FAIL_CHECK_ONCE(OBD_FAIL_OSC_CHECKSUM_SEND))
977                 cksum++;
978
979         return cksum;
980 }
981
982 static int osc_brw_prep_request(int cmd, struct client_obd *cli,struct obdo *oa,
983                                 struct lov_stripe_md *lsm, obd_count page_count,
984                                 struct brw_page **pga,
985                                 struct ptlrpc_request **reqp)
986 {
987         struct ptlrpc_request   *req;
988         struct ptlrpc_bulk_desc *desc;
989         struct ost_body         *body;
990         struct obd_ioobj        *ioobj;
991         struct niobuf_remote    *niobuf;
992         __u32 size[4] = { sizeof(struct ptlrpc_body), sizeof(*body) };
993         int niocount, i, requested_nob, opc, rc;
994         struct ptlrpc_request_pool *pool;
995         struct osc_brw_async_args *aa;
996         struct brw_page *pg_prev;
997
998         ENTRY;
999         OBD_FAIL_RETURN(OBD_FAIL_OSC_BRW_PREP_REQ, -ENOMEM); /* Recoverable */
1000         OBD_FAIL_RETURN(OBD_FAIL_OSC_BRW_PREP_REQ2, -EINVAL); /* Fatal */
1001
1002         opc = ((cmd & OBD_BRW_WRITE) != 0) ? OST_WRITE : OST_READ;
1003         pool = ((cmd & OBD_BRW_WRITE) != 0) ? cli->cl_import->imp_rq_pool :NULL;
1004
1005         for (niocount = i = 1; i < page_count; i++) {
1006                 if (!can_merge_pages(pga[i - 1], pga[i]))
1007                         niocount++;
1008         }
1009
1010         size[REQ_REC_OFF + 1] = sizeof(*ioobj);
1011         size[REQ_REC_OFF + 2] = niocount * sizeof(*niobuf);
1012
1013         req = ptlrpc_prep_req_pool(cli->cl_import, LUSTRE_OST_VERSION, opc, 4, size,
1014                                    NULL, pool);
1015         if (req == NULL)
1016                 RETURN (-ENOMEM);
1017
1018         req->rq_request_portal = OST_IO_PORTAL;         /* bug 7198 */
1019         ptlrpc_at_set_req_timeout(req);
1020
1021         if (opc == OST_WRITE)
1022                 desc = ptlrpc_prep_bulk_imp (req, page_count,
1023                                              BULK_GET_SOURCE, OST_BULK_PORTAL);
1024         else
1025                 desc = ptlrpc_prep_bulk_imp (req, page_count,
1026                                              BULK_PUT_SINK, OST_BULK_PORTAL);
1027         if (desc == NULL)
1028                 GOTO(out, rc = -ENOMEM);
1029         /* NB request now owns desc and will free it when it gets freed */
1030
1031         body = lustre_msg_buf(req->rq_reqmsg, REQ_REC_OFF, sizeof(*body));
1032         ioobj = lustre_msg_buf(req->rq_reqmsg, REQ_REC_OFF + 1, sizeof(*ioobj));
1033         niobuf = lustre_msg_buf(req->rq_reqmsg, REQ_REC_OFF + 2,
1034                                 niocount * sizeof(*niobuf));
1035
1036         memcpy(&body->oa, oa, sizeof(*oa));
1037
1038         obdo_to_ioobj(oa, ioobj);
1039         ioobj->ioo_bufcnt = niocount;
1040
1041         LASSERT (page_count > 0);
1042         pg_prev = pga[0];
1043         for (requested_nob = i = 0; i < page_count; i++, niobuf++) {
1044                 struct brw_page *pg = pga[i];
1045
1046                 LASSERT(pg->count > 0);
1047                 LASSERTF((pg->off & ~CFS_PAGE_MASK) + pg->count <= CFS_PAGE_SIZE,
1048                          "i: %d pg: %p off: "LPU64", count: %u\n", i, pg,
1049                          pg->off, pg->count);
1050 #ifdef __linux__
1051                 LASSERTF(i == 0 || pg->off > pg_prev->off,
1052                          "i %d p_c %u pg %p [pri %lu ind %lu] off "LPU64
1053                          " prev_pg %p [pri %lu ind %lu] off "LPU64"\n",
1054                          i, page_count,
1055                          pg->pg, page_private(pg->pg), pg->pg->index, pg->off,
1056                          pg_prev->pg, page_private(pg_prev->pg),
1057                          pg_prev->pg->index, pg_prev->off);
1058 #else
1059                 LASSERTF(i == 0 || pg->off > pg_prev->off,
1060                          "i %d p_c %u\n", i, page_count);
1061 #endif
1062                 LASSERT((pga[0]->flag & OBD_BRW_SRVLOCK) ==
1063                         (pg->flag & OBD_BRW_SRVLOCK));
1064
1065                 ptlrpc_prep_bulk_page(desc, pg->pg, pg->off & ~CFS_PAGE_MASK,
1066                                       pg->count);
1067                 requested_nob += pg->count;
1068
1069                 if (i > 0 && can_merge_pages(pg_prev, pg)) {
1070                         niobuf--;
1071                         niobuf->len += pg->count;
1072                 } else {
1073                         niobuf->offset = pg->off;
1074                         niobuf->len    = pg->count;
1075                         niobuf->flags  = pg->flag;
1076                 }
1077                 pg_prev = pg;
1078         }
1079
1080         LASSERTF((void *)(niobuf - niocount) ==
1081                 lustre_msg_buf(req->rq_reqmsg, REQ_REC_OFF + 2,
1082                                niocount * sizeof(*niobuf)),
1083                 "want %p - real %p\n", lustre_msg_buf(req->rq_reqmsg,
1084                 REQ_REC_OFF + 2, niocount * sizeof(*niobuf)),
1085                 (void *)(niobuf - niocount));
1086
1087         osc_announce_cached(cli, &body->oa, opc == OST_WRITE ? requested_nob:0);
1088
1089         /* size[REQ_REC_OFF] still sizeof (*body) */
1090         if (opc == OST_WRITE) {
1091                 if (cli->cl_checksum) {
1092                         /* store cl_cksum_type in a local variable since
1093                          * it can be changed via lprocfs */
1094                         cksum_type_t cksum_type = cli->cl_cksum_type;
1095
1096                         if ((body->oa.o_valid & OBD_MD_FLFLAGS) == 0)
1097                                 oa->o_flags = body->oa.o_flags = 0;
1098                         body->oa.o_flags |= cksum_type_pack(cksum_type);
1099                         body->oa.o_valid |= OBD_MD_FLCKSUM | OBD_MD_FLFLAGS;
1100                         body->oa.o_cksum = osc_checksum_bulk(requested_nob,
1101                                                              page_count, pga,
1102                                                              OST_WRITE,
1103                                                              cksum_type);
1104                         CDEBUG(D_PAGE, "checksum at write origin: %x\n",
1105                                body->oa.o_cksum);
1106                         /* save this in 'oa', too, for later checking */
1107                         oa->o_valid |= OBD_MD_FLCKSUM | OBD_MD_FLFLAGS;
1108                         oa->o_flags |= cksum_type_pack(cksum_type);
1109                 } else {
1110                         /* clear out the checksum flag, in case this is a
1111                          * resend but cl_checksum is no longer set. b=11238 */
1112                         oa->o_valid &= ~OBD_MD_FLCKSUM;
1113                 }
1114                 oa->o_cksum = body->oa.o_cksum;
1115                 /* 1 RC per niobuf */
1116                 size[REPLY_REC_OFF + 1] = sizeof(__u32) * niocount;
1117                 ptlrpc_req_set_repsize(req, 3, size);
1118         } else {
1119                 if (cli->cl_checksum) {
1120                         if ((body->oa.o_valid & OBD_MD_FLFLAGS) == 0)
1121                                 body->oa.o_flags = 0;
1122                         body->oa.o_flags |= cksum_type_pack(cli->cl_cksum_type);
1123                         body->oa.o_valid |= OBD_MD_FLCKSUM | OBD_MD_FLFLAGS;
1124                 }
1125                 /* 1 RC for the whole I/O */
1126                 ptlrpc_req_set_repsize(req, 2, size);
1127         }
1128
1129         CLASSERT(sizeof(*aa) <= sizeof(req->rq_async_args));
1130         aa = ptlrpc_req_async_args(req);
1131         aa->aa_oa = oa;
1132         aa->aa_requested_nob = requested_nob;
1133         aa->aa_nio_count = niocount;
1134         aa->aa_page_count = page_count;
1135         aa->aa_resends = 0;
1136         aa->aa_ppga = pga;
1137         aa->aa_cli = cli;
1138         CFS_INIT_LIST_HEAD(&aa->aa_oaps);
1139
1140         *reqp = req;
1141         RETURN (0);
1142
1143  out:
1144         ptlrpc_req_finished (req);
1145         RETURN (rc);
1146 }
1147
1148 static int check_write_checksum(struct obdo *oa, const lnet_process_id_t *peer,
1149                                 __u32 client_cksum, __u32 server_cksum, int nob,
1150                                 obd_count page_count, struct brw_page **pga,
1151                                 cksum_type_t client_cksum_type)
1152 {
1153         __u32 new_cksum;
1154         char *msg;
1155         cksum_type_t cksum_type;
1156
1157         if (server_cksum == client_cksum) {
1158                 CDEBUG(D_PAGE, "checksum %x confirmed\n", client_cksum);
1159                 return 0;
1160         }
1161
1162         if (oa->o_valid & OBD_MD_FLFLAGS)
1163                 cksum_type = cksum_type_unpack(oa->o_flags);
1164         else
1165                 cksum_type = OBD_CKSUM_CRC32;
1166
1167         new_cksum = osc_checksum_bulk(nob, page_count, pga, OST_WRITE,
1168                                       cksum_type);
1169
1170         if (cksum_type != client_cksum_type)
1171                 msg = "the server did not use the checksum type specified in "
1172                       "the original request - likely a protocol problem";
1173         else if (new_cksum == server_cksum)
1174                 msg = "changed on the client after we checksummed it - "
1175                       "likely false positive due to mmap IO (bug 11742)";
1176         else if (new_cksum == client_cksum)
1177                 msg = "changed in transit before arrival at OST";
1178         else
1179                 msg = "changed in transit AND doesn't match the original - "
1180                       "likely false positive due to mmap IO (bug 11742)";
1181
1182         LCONSOLE_ERROR_MSG(0x132, "BAD WRITE CHECKSUM: %s: from %s inum "
1183                            LPU64"/"LPU64" object "LPU64"/"LPU64" extent "
1184                            "["LPU64"-"LPU64"]\n",
1185                            msg, libcfs_nid2str(peer->nid),
1186                            oa->o_valid & OBD_MD_FLFID ? oa->o_fid : (__u64)0,
1187                            oa->o_valid & OBD_MD_FLFID ? oa->o_generation :
1188                                                         (__u64)0,
1189                            oa->o_id,
1190                            oa->o_valid & OBD_MD_FLGROUP ? oa->o_gr : (__u64)0,
1191                            pga[0]->off,
1192                            pga[page_count-1]->off + pga[page_count-1]->count - 1);
1193         CERROR("original client csum %x (type %x), server csum %x (type %x), "
1194                "client csum now %x\n", client_cksum, client_cksum_type,
1195                server_cksum, cksum_type, new_cksum);
1196
1197         return 1;
1198 }
1199
1200 /* Note rc enters this function as number of bytes transferred */
1201 static int osc_brw_fini_request(struct ptlrpc_request *req, int rc)
1202 {
1203         struct osc_brw_async_args *aa = ptlrpc_req_async_args(req);
1204         const lnet_process_id_t *peer =
1205                         &req->rq_import->imp_connection->c_peer;
1206         struct client_obd *cli = aa->aa_cli;
1207         struct ost_body *body;
1208         __u32 client_cksum = 0;
1209         ENTRY;
1210
1211         if (rc < 0 && rc != -EDQUOT)
1212                 RETURN(rc);
1213
1214         LASSERTF(req->rq_repmsg != NULL, "rc = %d\n", rc);
1215         body = lustre_swab_repbuf(req, REPLY_REC_OFF, sizeof(*body),
1216                                   lustre_swab_ost_body);
1217         if (body == NULL) {
1218                 CERROR ("Can't unpack body\n");
1219                 RETURN(-EPROTO);
1220         }
1221
1222         /* set/clear over quota flag for a uid/gid */
1223         if (lustre_msg_get_opc(req->rq_reqmsg) == OST_WRITE &&
1224             body->oa.o_valid & (OBD_MD_FLUSRQUOTA | OBD_MD_FLGRPQUOTA))
1225                 lquota_setdq(quota_interface, cli, body->oa.o_uid,
1226                              body->oa.o_gid, body->oa.o_valid,
1227                              body->oa.o_flags);
1228
1229         if (rc < 0)
1230                 RETURN(rc);
1231
1232         if (aa->aa_oa->o_valid & OBD_MD_FLCKSUM)
1233                 client_cksum = aa->aa_oa->o_cksum; /* save for later */
1234
1235         osc_update_grant(cli, body);
1236
1237         if (lustre_msg_get_opc(req->rq_reqmsg) == OST_WRITE) {
1238                 if (rc > 0) {
1239                         CERROR ("Unexpected +ve rc %d\n", rc);
1240                         RETURN(-EPROTO);
1241                 }
1242                 LASSERT(req->rq_bulk->bd_nob == aa->aa_requested_nob);
1243
1244                 if ((aa->aa_oa->o_valid & OBD_MD_FLCKSUM) && client_cksum &&
1245                     check_write_checksum(&body->oa, peer, client_cksum,
1246                                          body->oa.o_cksum, aa->aa_requested_nob,
1247                                          aa->aa_page_count, aa->aa_ppga,
1248                                          cksum_type_unpack(aa->aa_oa->o_flags)))
1249                         RETURN(-EAGAIN);
1250
1251                 rc = check_write_rcs(req, aa->aa_requested_nob,aa->aa_nio_count,
1252                                      aa->aa_page_count, aa->aa_ppga);
1253                 GOTO(out, rc);
1254         }
1255
1256         /* The rest of this function executes only for OST_READs */
1257         if (rc > aa->aa_requested_nob) {
1258                 CERROR("Unexpected rc %d (%d requested)\n", rc,
1259                        aa->aa_requested_nob);
1260                 RETURN(-EPROTO);
1261         }
1262
1263         if (rc != req->rq_bulk->bd_nob_transferred) {
1264                 CERROR ("Unexpected rc %d (%d transferred)\n",
1265                         rc, req->rq_bulk->bd_nob_transferred);
1266                 return (-EPROTO);
1267         }
1268
1269         if (rc < aa->aa_requested_nob)
1270                 handle_short_read(rc, aa->aa_page_count, aa->aa_ppga);
1271
1272         if (body->oa.o_valid & OBD_MD_FLCKSUM) {
1273                 static int cksum_counter;
1274                 __u32      server_cksum = body->oa.o_cksum;
1275                 char      *via;
1276                 char      *router;
1277                 cksum_type_t cksum_type;
1278
1279                 if (body->oa.o_valid & OBD_MD_FLFLAGS)
1280                         cksum_type = cksum_type_unpack(body->oa.o_flags);
1281                 else
1282                         cksum_type = OBD_CKSUM_CRC32;
1283                 client_cksum = osc_checksum_bulk(rc, aa->aa_page_count,
1284                                                  aa->aa_ppga, OST_READ,
1285                                                  cksum_type);
1286
1287                 if (peer->nid == req->rq_bulk->bd_sender) {
1288                         via = router = "";
1289                 } else {
1290                         via = " via ";
1291                         router = libcfs_nid2str(req->rq_bulk->bd_sender);
1292                 }
1293
1294                 if (server_cksum == ~0 && rc > 0) {
1295                         CERROR("Protocol error: server %s set the 'checksum' "
1296                                "bit, but didn't send a checksum.  Not fatal, "
1297                                "but please notify on http://bugzilla.lustre.org/\n",
1298                                libcfs_nid2str(peer->nid));
1299                 } else if (server_cksum != client_cksum) {
1300                         LCONSOLE_ERROR_MSG(0x133, "%s: BAD READ CHECKSUM: from "
1301                                            "%s%s%s inum "LPU64"/"LPU64" object "
1302                                            LPU64"/"LPU64" extent "
1303                                            "["LPU64"-"LPU64"]\n",
1304                                            req->rq_import->imp_obd->obd_name,
1305                                            libcfs_nid2str(peer->nid),
1306                                            via, router,
1307                                            body->oa.o_valid & OBD_MD_FLFID ?
1308                                                 body->oa.o_fid : (__u64)0,
1309                                            body->oa.o_valid & OBD_MD_FLFID ?
1310                                                 body->oa.o_generation :(__u64)0,
1311                                            body->oa.o_id,
1312                                            body->oa.o_valid & OBD_MD_FLGROUP ?
1313                                                 body->oa.o_gr : (__u64)0,
1314                                            aa->aa_ppga[0]->off,
1315                                            aa->aa_ppga[aa->aa_page_count-1]->off +
1316                                            aa->aa_ppga[aa->aa_page_count-1]->count -
1317                                                                         1);
1318                         CERROR("client %x, server %x, cksum_type %x\n",
1319                                client_cksum, server_cksum, cksum_type);
1320                         cksum_counter = 0;
1321                         aa->aa_oa->o_cksum = client_cksum;
1322                         rc = -EAGAIN;
1323                 } else {
1324                         cksum_counter++;
1325                         CDEBUG(D_PAGE, "checksum %x confirmed\n", client_cksum);
1326                         rc = 0;
1327                 }
1328         } else if (unlikely(client_cksum)) {
1329                 static int cksum_missed;
1330
1331                 cksum_missed++;
1332                 if ((cksum_missed & (-cksum_missed)) == cksum_missed)
1333                         CERROR("Checksum %u requested from %s but not sent\n",
1334                                cksum_missed, libcfs_nid2str(peer->nid));
1335         } else {
1336                 rc = 0;
1337         }
1338 out:
1339         if (rc >= 0)
1340                 memcpy(aa->aa_oa, &body->oa, sizeof(*aa->aa_oa));
1341
1342         RETURN(rc);
1343 }
1344
1345 static int osc_brw_internal(int cmd, struct obd_export *exp,struct obdo *oa,
1346                             struct lov_stripe_md *lsm,
1347                             obd_count page_count, struct brw_page **pga)
1348 {
1349         struct ptlrpc_request *request;
1350         int                    rc;
1351         cfs_waitq_t            waitq;
1352         int                    resends = 0;
1353         struct l_wait_info     lwi;
1354
1355         ENTRY;
1356         init_waitqueue_head(&waitq);
1357
1358 restart_bulk:
1359         rc = osc_brw_prep_request(cmd, &exp->exp_obd->u.cli, oa, lsm,
1360                                   page_count, pga, &request);
1361         if (rc != 0)
1362                 return (rc);
1363
1364         rc = ptlrpc_queue_wait(request);
1365
1366         if (rc == -ETIMEDOUT && request->rq_resend) {
1367                 DEBUG_REQ(D_HA, request,  "BULK TIMEOUT");
1368                 ptlrpc_req_finished(request);
1369                 goto restart_bulk;
1370         }
1371
1372         rc = osc_brw_fini_request(request, rc);
1373
1374         ptlrpc_req_finished(request);
1375         if (osc_recoverable_error(rc)) {
1376                 resends++;
1377                 if (!osc_should_resend(resends, &exp->exp_obd->u.cli)) {
1378                         CERROR("too many resend retries, returning error\n");
1379                         RETURN(-EIO);
1380                 }
1381
1382                 lwi = LWI_TIMEOUT_INTR(cfs_time_seconds(resends), NULL, NULL, NULL);
1383                 l_wait_event(waitq, 0, &lwi);
1384
1385                 goto restart_bulk;
1386         }
1387         RETURN(rc);
1388 }
1389
1390 int osc_brw_redo_request(struct ptlrpc_request *request,
1391                          struct osc_brw_async_args *aa)
1392 {
1393         struct ptlrpc_request *new_req;
1394         struct ptlrpc_request_set *set = request->rq_set;
1395         struct osc_brw_async_args *new_aa;
1396         struct osc_async_page *oap;
1397         int rc = 0;
1398         ENTRY;
1399
1400         if (!osc_should_resend(aa->aa_resends, aa->aa_cli)) {
1401                 CERROR("too many resend retries, returning error\n");
1402                 RETURN(-EIO);
1403         }
1404
1405         DEBUG_REQ(D_ERROR, request, "redo for recoverable error");
1406
1407         rc = osc_brw_prep_request(lustre_msg_get_opc(request->rq_reqmsg) ==
1408                                         OST_WRITE ? OBD_BRW_WRITE :OBD_BRW_READ,
1409                                   aa->aa_cli, aa->aa_oa,
1410                                   NULL /* lsm unused by osc currently */,
1411                                   aa->aa_page_count, aa->aa_ppga, &new_req);
1412         if (rc)
1413                 RETURN(rc);
1414
1415         client_obd_list_lock(&aa->aa_cli->cl_loi_list_lock);
1416
1417         list_for_each_entry(oap, &aa->aa_oaps, oap_rpc_item) {
1418                 if (oap->oap_request != NULL) {
1419                         LASSERTF(request == oap->oap_request,
1420                                  "request %p != oap_request %p\n",
1421                                  request, oap->oap_request);
1422                         if (oap->oap_interrupted) {
1423                                 client_obd_list_unlock(&aa->aa_cli->cl_loi_list_lock);
1424                                 ptlrpc_req_finished(new_req);
1425                                 RETURN(-EINTR);
1426                         }
1427                 }
1428         }
1429         /* New request takes over pga and oaps from old request.
1430          * Note that copying a list_head doesn't work, need to move it... */
1431         aa->aa_resends++;
1432         new_req->rq_interpret_reply = request->rq_interpret_reply;
1433         new_req->rq_async_args = request->rq_async_args;
1434         new_req->rq_sent = CURRENT_SECONDS + aa->aa_resends;
1435
1436         new_aa = ptlrpc_req_async_args(new_req);
1437
1438         CFS_INIT_LIST_HEAD(&new_aa->aa_oaps);
1439         list_splice(&aa->aa_oaps, &new_aa->aa_oaps);
1440         CFS_INIT_LIST_HEAD(&aa->aa_oaps);
1441
1442         list_for_each_entry(oap, &new_aa->aa_oaps, oap_rpc_item) {
1443                 if (oap->oap_request) {
1444                         ptlrpc_req_finished(oap->oap_request);
1445                         oap->oap_request = ptlrpc_request_addref(new_req);
1446                 }
1447         }
1448
1449         /* use ptlrpc_set_add_req is safe because interpret functions work
1450          * in check_set context. only one way exist with access to request
1451          * from different thread got -EINTR - this way protected with
1452          * cl_loi_list_lock */
1453         ptlrpc_set_add_req(set, new_req);
1454
1455         client_obd_list_unlock(&aa->aa_cli->cl_loi_list_lock);
1456
1457         DEBUG_REQ(D_INFO, new_req, "new request");
1458         RETURN(0);
1459 }
1460
1461 static int async_internal(int cmd, struct obd_export *exp, struct obdo *oa,
1462                           struct lov_stripe_md *lsm, obd_count page_count,
1463                           struct brw_page **pga, struct ptlrpc_request_set *set)
1464 {
1465         struct ptlrpc_request     *request;
1466         struct client_obd         *cli = &exp->exp_obd->u.cli;
1467         int                        rc, i;
1468         struct osc_brw_async_args *aa;
1469         ENTRY;
1470
1471         /* Consume write credits even if doing a sync write -
1472          * otherwise we may run out of space on OST due to grant. */
1473         if (cmd == OBD_BRW_WRITE) {
1474                 client_obd_list_lock(&cli->cl_loi_list_lock);
1475                 for (i = 0; i < page_count; i++) {
1476                         if (cli->cl_avail_grant >= CFS_PAGE_SIZE)
1477                                 osc_consume_write_grant(cli, pga[i]);
1478                 }
1479                 client_obd_list_unlock(&cli->cl_loi_list_lock);
1480         }
1481
1482         rc = osc_brw_prep_request(cmd, &exp->exp_obd->u.cli, oa, lsm,
1483                                   page_count, pga, &request);
1484
1485         CLASSERT(sizeof(*aa) <= sizeof(request->rq_async_args));
1486         aa = ptlrpc_req_async_args(request);
1487         if (cmd == OBD_BRW_READ) {
1488                 lprocfs_oh_tally_log2(&cli->cl_read_page_hist, page_count);
1489                 lprocfs_oh_tally(&cli->cl_read_rpc_hist, cli->cl_r_in_flight);
1490         } else {
1491                 lprocfs_oh_tally_log2(&cli->cl_write_page_hist, page_count);
1492                 lprocfs_oh_tally(&cli->cl_write_rpc_hist,
1493                                  cli->cl_w_in_flight);
1494         }
1495         ptlrpc_lprocfs_brw(request, aa->aa_requested_nob);
1496
1497         LASSERT(list_empty(&aa->aa_oaps));
1498
1499         if (rc == 0) {
1500                 request->rq_interpret_reply = brw_interpret;
1501                 ptlrpc_set_add_req(set, request);
1502                 client_obd_list_lock(&cli->cl_loi_list_lock);
1503                 if (cmd == OBD_BRW_READ)
1504                         cli->cl_r_in_flight++;
1505                 else
1506                         cli->cl_w_in_flight++;
1507                 client_obd_list_unlock(&cli->cl_loi_list_lock);
1508                 OBD_FAIL_TIMEOUT(OBD_FAIL_OSC_DIO_PAUSE, 3);
1509         } else if (cmd == OBD_BRW_WRITE) {
1510                 client_obd_list_lock(&cli->cl_loi_list_lock);
1511                 for (i = 0; i < page_count; i++)
1512                         osc_release_write_grant(cli, pga[i], 0);
1513                 osc_wake_cache_waiters(cli);
1514                 client_obd_list_unlock(&cli->cl_loi_list_lock);
1515         }
1516
1517         RETURN (rc);
1518 }
1519
1520 /*
1521  * ugh, we want disk allocation on the target to happen in offset order.  we'll
1522  * follow sedgewicks advice and stick to the dead simple shellsort -- it'll do
1523  * fine for our small page arrays and doesn't require allocation.  its an
1524  * insertion sort that swaps elements that are strides apart, shrinking the
1525  * stride down until its '1' and the array is sorted.
1526  */
1527 static void sort_brw_pages(struct brw_page **array, int num)
1528 {
1529         int stride, i, j;
1530         struct brw_page *tmp;
1531
1532         if (num == 1)
1533                 return;
1534         for (stride = 1; stride < num ; stride = (stride * 3) + 1)
1535                 ;
1536
1537         do {
1538                 stride /= 3;
1539                 for (i = stride ; i < num ; i++) {
1540                         tmp = array[i];
1541                         j = i;
1542                         while (j >= stride && array[j-stride]->off > tmp->off) {
1543                                 array[j] = array[j - stride];
1544                                 j -= stride;
1545                         }
1546                         array[j] = tmp;
1547                 }
1548         } while (stride > 1);
1549 }
1550
1551 static obd_count max_unfragmented_pages(struct brw_page **pg, obd_count pages)
1552 {
1553         int count = 1;
1554         int offset;
1555         int i = 0;
1556
1557         LASSERT (pages > 0);
1558         offset = pg[i]->off & (~CFS_PAGE_MASK);
1559
1560         for (;;) {
1561                 pages--;
1562                 if (pages == 0)         /* that's all */
1563                         return count;
1564
1565                 if (offset + pg[i]->count < CFS_PAGE_SIZE)
1566                         return count;   /* doesn't end on page boundary */
1567
1568                 i++;
1569                 offset = pg[i]->off & (~CFS_PAGE_MASK);
1570                 if (offset != 0)        /* doesn't start on page boundary */
1571                         return count;
1572
1573                 count++;
1574         }
1575 }
1576
1577 static struct brw_page **osc_build_ppga(struct brw_page *pga, obd_count count)
1578 {
1579         struct brw_page **ppga;
1580         int i;
1581
1582         OBD_ALLOC(ppga, sizeof(*ppga) * count);
1583         if (ppga == NULL)
1584                 return NULL;
1585
1586         for (i = 0; i < count; i++)
1587                 ppga[i] = pga + i;
1588         return ppga;
1589 }
1590
1591 static void osc_release_ppga(struct brw_page **ppga, obd_count count)
1592 {
1593         LASSERT(ppga != NULL);
1594         OBD_FREE(ppga, sizeof(*ppga) * count);
1595 }
1596
1597 static int osc_brw(int cmd, struct obd_export *exp, struct obd_info *oinfo,
1598                    obd_count page_count, struct brw_page *pga,
1599                    struct obd_trans_info *oti)
1600 {
1601         struct obdo *saved_oa = NULL;
1602         struct brw_page **ppga, **orig;
1603         struct obd_import *imp = class_exp2cliimp(exp);
1604         struct client_obd *cli = &imp->imp_obd->u.cli;
1605         int rc, page_count_orig;
1606         ENTRY;
1607
1608         if (cmd & OBD_BRW_CHECK) {
1609                 /* The caller just wants to know if there's a chance that this
1610                  * I/O can succeed */
1611
1612                 if (imp == NULL || imp->imp_invalid)
1613                         RETURN(-EIO);
1614                 RETURN(0);
1615         }
1616
1617         /* test_brw with a failed create can trip this, maybe others. */
1618         LASSERT(cli->cl_max_pages_per_rpc);
1619
1620         rc = 0;
1621
1622         orig = ppga = osc_build_ppga(pga, page_count);
1623         if (ppga == NULL)
1624                 RETURN(-ENOMEM);
1625         page_count_orig = page_count;
1626
1627         sort_brw_pages(ppga, page_count);
1628         while (page_count) {
1629                 obd_count pages_per_brw;
1630
1631                 if (page_count > cli->cl_max_pages_per_rpc)
1632                         pages_per_brw = cli->cl_max_pages_per_rpc;
1633                 else
1634                         pages_per_brw = page_count;
1635
1636                 pages_per_brw = max_unfragmented_pages(ppga, pages_per_brw);
1637
1638                 if (saved_oa != NULL) {
1639                         /* restore previously saved oa */
1640                         *oinfo->oi_oa = *saved_oa;
1641                 } else if (page_count > pages_per_brw) {
1642                         /* save a copy of oa (brw will clobber it) */
1643                         OBDO_ALLOC(saved_oa);
1644                         if (saved_oa == NULL)
1645                                 GOTO(out, rc = -ENOMEM);
1646                         *saved_oa = *oinfo->oi_oa;
1647                 }
1648
1649                 rc = osc_brw_internal(cmd, exp, oinfo->oi_oa, oinfo->oi_md,
1650                                       pages_per_brw, ppga);
1651
1652                 if (rc != 0)
1653                         break;
1654
1655                 page_count -= pages_per_brw;
1656                 ppga += pages_per_brw;
1657         }
1658
1659 out:
1660         osc_release_ppga(orig, page_count_orig);
1661
1662         if (saved_oa != NULL)
1663                 OBDO_FREE(saved_oa);
1664
1665         RETURN(rc);
1666 }
1667
1668 static int osc_brw_async(int cmd, struct obd_export *exp,
1669                          struct obd_info *oinfo, obd_count page_count,
1670                          struct brw_page *pga, struct obd_trans_info *oti,
1671                          struct ptlrpc_request_set *set)
1672 {
1673         struct brw_page **ppga, **orig;
1674         int page_count_orig;
1675         int rc = 0;
1676         ENTRY;
1677
1678         if (cmd & OBD_BRW_CHECK) {
1679                 /* The caller just wants to know if there's a chance that this
1680                  * I/O can succeed */
1681                 struct obd_import *imp = class_exp2cliimp(exp);
1682
1683                 if (imp == NULL || imp->imp_invalid)
1684                         RETURN(-EIO);
1685                 RETURN(0);
1686         }
1687
1688         orig = ppga = osc_build_ppga(pga, page_count);
1689         if (ppga == NULL)
1690                 RETURN(-ENOMEM);
1691         page_count_orig = page_count;
1692
1693         sort_brw_pages(ppga, page_count);
1694         while (page_count) {
1695                 struct brw_page **copy;
1696                 obd_count pages_per_brw;
1697
1698                 pages_per_brw = min_t(obd_count, page_count,
1699                     class_exp2cliimp(exp)->imp_obd->u.cli.cl_max_pages_per_rpc);
1700
1701                 pages_per_brw = max_unfragmented_pages(ppga, pages_per_brw);
1702
1703                 /* use ppga only if single RPC is going to fly */
1704                 if (pages_per_brw != page_count_orig || ppga != orig) {
1705                         OBD_ALLOC(copy, pages_per_brw * sizeof(*copy));
1706                         if (copy == NULL)
1707                                 GOTO(out, rc = -ENOMEM);
1708                         memcpy(copy, ppga, pages_per_brw * sizeof(*copy));
1709                 } else
1710                         copy = ppga;
1711
1712                 rc = async_internal(cmd, exp, oinfo->oi_oa, oinfo->oi_md,
1713                                     pages_per_brw, copy, set);
1714
1715                 if (rc != 0) {
1716                         if (copy != ppga)
1717                                 OBD_FREE(copy, pages_per_brw * sizeof(*copy));
1718                         break;
1719                 }
1720
1721                 if (copy == orig) {
1722                         /* we passed it to async_internal() which is
1723                          * now responsible for releasing memory */
1724                         orig = NULL;
1725                 }
1726
1727                 page_count -= pages_per_brw;
1728                 ppga += pages_per_brw;
1729         }
1730 out:
1731         if (orig)
1732                 osc_release_ppga(orig, page_count_orig);
1733         RETURN(rc);
1734 }
1735
1736 static void osc_check_rpcs(struct client_obd *cli);
1737
1738 /* The companion to osc_enter_cache(), called when @oap is no longer part of
1739  * the dirty accounting.  Writeback completes or truncate happens before
1740  * writing starts.  Must be called with the loi lock held. */
1741 static void osc_exit_cache(struct client_obd *cli, struct osc_async_page *oap,
1742                            int sent)
1743 {
1744         osc_release_write_grant(cli, &oap->oap_brw_page, sent);
1745 }
1746
1747 /* This maintains the lists of pending pages to read/write for a given object
1748  * (lop).  This is used by osc_check_rpcs->osc_next_loi() and loi_list_maint()
1749  * to quickly find objects that are ready to send an RPC. */
1750 static int lop_makes_rpc(struct client_obd *cli, struct loi_oap_pages *lop,
1751                          int cmd)
1752 {
1753         int optimal;
1754         ENTRY;
1755
1756         if (lop->lop_num_pending == 0)
1757                 RETURN(0);
1758
1759         /* if we have an invalid import we want to drain the queued pages
1760          * by forcing them through rpcs that immediately fail and complete
1761          * the pages.  recovery relies on this to empty the queued pages
1762          * before canceling the locks and evicting down the llite pages */
1763         if (cli->cl_import == NULL || cli->cl_import->imp_invalid)
1764                 RETURN(1);
1765
1766         /* stream rpcs in queue order as long as as there is an urgent page
1767          * queued.  this is our cheap solution for good batching in the case
1768          * where writepage marks some random page in the middle of the file
1769          * as urgent because of, say, memory pressure */
1770         if (!list_empty(&lop->lop_urgent)) {
1771                 CDEBUG(D_CACHE, "urgent request forcing RPC\n");
1772                 RETURN(1);
1773         }
1774
1775         /* fire off rpcs when we have 'optimal' rpcs as tuned for the wire. */
1776         optimal = cli->cl_max_pages_per_rpc;
1777         if (cmd & OBD_BRW_WRITE) {
1778                 /* trigger a write rpc stream as long as there are dirtiers
1779                  * waiting for space.  as they're waiting, they're not going to
1780                  * create more pages to coallesce with what's waiting.. */
1781                 if (!list_empty(&cli->cl_cache_waiters)) {
1782                         CDEBUG(D_CACHE, "cache waiters forcing RPC\n");
1783                         RETURN(1);
1784                 }
1785
1786                 /* +16 to avoid triggering rpcs that would want to include pages
1787                  * that are being queued but which can't be made ready until
1788                  * the queuer finishes with the page. this is a wart for
1789                  * llite::commit_write() */
1790                 optimal += 16;
1791         }
1792         if (lop->lop_num_pending >= optimal)
1793                 RETURN(1);
1794
1795         RETURN(0);
1796 }
1797
1798 static int lop_makes_hprpc(struct loi_oap_pages *lop)
1799 {
1800         struct osc_async_page *oap;
1801         ENTRY;
1802
1803         if (list_empty(&lop->lop_urgent))
1804                 RETURN(0);
1805
1806         oap = list_entry(lop->lop_urgent.next,
1807                          struct osc_async_page, oap_urgent_item);
1808
1809         if (oap->oap_async_flags & ASYNC_HP) {
1810                 CDEBUG(D_CACHE, "hp request forcing RPC\n");
1811                 RETURN(1);
1812         }
1813
1814         RETURN(0);
1815 }
1816
1817 static void on_list(struct list_head *item, struct list_head *list,
1818                     int should_be_on)
1819 {
1820         if (list_empty(item) && should_be_on)
1821                 list_add_tail(item, list);
1822         else if (!list_empty(item) && !should_be_on)
1823                 list_del_init(item);
1824 }
1825
1826 /* maintain the loi's cli list membership invariants so that osc_send_oap_rpc
1827  * can find pages to build into rpcs quickly */
1828 static void loi_list_maint(struct client_obd *cli, struct lov_oinfo *loi)
1829 {
1830         if (lop_makes_hprpc(&loi->loi_write_lop) ||
1831             lop_makes_hprpc(&loi->loi_read_lop)) {
1832                 /* HP rpc */
1833                 on_list(&loi->loi_ready_item, &cli->cl_loi_ready_list, 0);
1834                 on_list(&loi->loi_hp_ready_item, &cli->cl_loi_hp_ready_list, 1);
1835         } else {
1836                 on_list(&loi->loi_hp_ready_item, &cli->cl_loi_hp_ready_list, 0);
1837                 on_list(&loi->loi_ready_item, &cli->cl_loi_ready_list,
1838                         lop_makes_rpc(cli, &loi->loi_write_lop, OBD_BRW_WRITE)||
1839                         lop_makes_rpc(cli, &loi->loi_read_lop, OBD_BRW_READ));
1840         }
1841
1842         on_list(&loi->loi_write_item, &cli->cl_loi_write_list,
1843                 loi->loi_write_lop.lop_num_pending);
1844
1845         on_list(&loi->loi_read_item, &cli->cl_loi_read_list,
1846                 loi->loi_read_lop.lop_num_pending);
1847 }
1848
1849 static void lop_update_pending(struct client_obd *cli,
1850                                struct loi_oap_pages *lop, int cmd, int delta)
1851 {
1852         lop->lop_num_pending += delta;
1853         if (cmd & OBD_BRW_WRITE)
1854                 cli->cl_pending_w_pages += delta;
1855         else
1856                 cli->cl_pending_r_pages += delta;
1857 }
1858
1859 /* this is called when a sync waiter receives an interruption.  Its job is to
1860  * get the caller woken as soon as possible.  If its page hasn't been put in an
1861  * rpc yet it can dequeue immediately.  Otherwise it has to mark the rpc as
1862  * desiring interruption which will forcefully complete the rpc once the rpc
1863  * has timed out */
1864 static void osc_occ_interrupted(struct oig_callback_context *occ)
1865 {
1866         struct osc_async_page *oap;
1867         struct loi_oap_pages *lop;
1868         struct lov_oinfo *loi;
1869         ENTRY;
1870
1871         /* XXX member_of() */
1872         oap = list_entry(occ, struct osc_async_page, oap_occ);
1873
1874         client_obd_list_lock(&oap->oap_cli->cl_loi_list_lock);
1875
1876         oap->oap_interrupted = 1;
1877
1878         /* ok, it's been put in an rpc. only one oap gets a request reference */
1879         if (oap->oap_request != NULL) {
1880                 ptlrpc_mark_interrupted(oap->oap_request);
1881                 ptlrpcd_wake(oap->oap_request);
1882                 GOTO(unlock, 0);
1883         }
1884
1885         /* we don't get interruption callbacks until osc_trigger_group_io()
1886          * has been called and put the sync oaps in the pending/urgent lists.*/
1887         if (!list_empty(&oap->oap_pending_item)) {
1888                 list_del_init(&oap->oap_pending_item);
1889                 list_del_init(&oap->oap_urgent_item);
1890
1891                 loi = oap->oap_loi;
1892                 lop = (oap->oap_cmd & OBD_BRW_WRITE) ?
1893                         &loi->loi_write_lop : &loi->loi_read_lop;
1894                 lop_update_pending(oap->oap_cli, lop, oap->oap_cmd, -1);
1895                 loi_list_maint(oap->oap_cli, oap->oap_loi);
1896
1897                 oig_complete_one(oap->oap_oig, &oap->oap_occ, -EINTR);
1898                 oap->oap_oig = NULL;
1899         }
1900
1901 unlock:
1902         client_obd_list_unlock(&oap->oap_cli->cl_loi_list_lock);
1903 }
1904
1905 /* this is trying to propogate async writeback errors back up to the
1906  * application.  As an async write fails we record the error code for later if
1907  * the app does an fsync.  As long as errors persist we force future rpcs to be
1908  * sync so that the app can get a sync error and break the cycle of queueing
1909  * pages for which writeback will fail. */
1910 static void osc_process_ar(struct osc_async_rc *ar, __u64 xid,
1911                            int rc)
1912 {
1913         if (rc) {
1914                 if (!ar->ar_rc)
1915                         ar->ar_rc = rc;
1916
1917                 ar->ar_force_sync = 1;
1918                 ar->ar_min_xid = ptlrpc_sample_next_xid();
1919                 return;
1920
1921         }
1922
1923         if (ar->ar_force_sync && (xid >= ar->ar_min_xid))
1924                 ar->ar_force_sync = 0;
1925 }
1926
1927 static void osc_oap_to_pending(struct osc_async_page *oap)
1928 {
1929         struct loi_oap_pages *lop;
1930
1931         if (oap->oap_cmd & OBD_BRW_WRITE)
1932                 lop = &oap->oap_loi->loi_write_lop;
1933         else
1934                 lop = &oap->oap_loi->loi_read_lop;
1935
1936         if (oap->oap_async_flags & ASYNC_HP)
1937                 list_add(&oap->oap_urgent_item, &lop->lop_urgent);
1938         else if (oap->oap_async_flags & ASYNC_URGENT)
1939                 list_add_tail(&oap->oap_urgent_item, &lop->lop_urgent);
1940         list_add_tail(&oap->oap_pending_item, &lop->lop_pending);
1941         lop_update_pending(oap->oap_cli, lop, oap->oap_cmd, 1);
1942 }
1943
1944 /* this must be called holding the loi list lock to give coverage to exit_cache,
1945  * async_flag maintenance, and oap_request */
1946 static void osc_ap_completion(struct client_obd *cli, struct obdo *oa,
1947                               struct osc_async_page *oap, int sent, int rc)
1948 {
1949         __u64 xid = 0;
1950
1951         ENTRY;
1952         if (oap->oap_request != NULL) {
1953                 xid = ptlrpc_req_xid(oap->oap_request);
1954                 ptlrpc_req_finished(oap->oap_request);
1955                 oap->oap_request = NULL;
1956         }
1957
1958         oap->oap_async_flags = 0;
1959         oap->oap_interrupted = 0;
1960
1961         if (oap->oap_cmd & OBD_BRW_WRITE) {
1962                 osc_process_ar(&cli->cl_ar, xid, rc);
1963                 osc_process_ar(&oap->oap_loi->loi_ar, xid, rc);
1964         }
1965
1966         if (rc == 0 && oa != NULL) {
1967                 if (oa->o_valid & OBD_MD_FLBLOCKS)
1968                         oap->oap_loi->loi_lvb.lvb_blocks = oa->o_blocks;
1969                 if (oa->o_valid & OBD_MD_FLMTIME)
1970                         oap->oap_loi->loi_lvb.lvb_mtime = oa->o_mtime;
1971                 if (oa->o_valid & OBD_MD_FLATIME)
1972                         oap->oap_loi->loi_lvb.lvb_atime = oa->o_atime;
1973                 if (oa->o_valid & OBD_MD_FLCTIME)
1974                         oap->oap_loi->loi_lvb.lvb_ctime = oa->o_ctime;
1975         }
1976
1977         if (oap->oap_oig) {
1978                 osc_exit_cache(cli, oap, sent);
1979                 oig_complete_one(oap->oap_oig, &oap->oap_occ, rc);
1980                 oap->oap_oig = NULL;
1981                 EXIT;
1982                 return;
1983         }
1984
1985         rc = oap->oap_caller_ops->ap_completion(oap->oap_caller_data,
1986                                                 oap->oap_cmd, oa, rc);
1987
1988         /* ll_ap_completion (from llite) drops PG_locked. so, a new
1989          * I/O on the page could start, but OSC calls it under lock
1990          * and thus we can add oap back to pending safely */
1991         if (rc)
1992                 /* upper layer wants to leave the page on pending queue */
1993                 osc_oap_to_pending(oap);
1994         else
1995                 osc_exit_cache(cli, oap, sent);
1996         EXIT;
1997 }
1998
1999 static int brw_interpret(struct ptlrpc_request *request, void *data, int rc)
2000 {
2001         struct osc_brw_async_args *aa = data;
2002         struct client_obd *cli;
2003         ENTRY;
2004
2005         rc = osc_brw_fini_request(request, rc);
2006         CDEBUG(D_INODE, "request %p aa %p rc %d\n", request, aa, rc);
2007
2008         if (osc_recoverable_error(rc)) {
2009                 rc = osc_brw_redo_request(request, aa);
2010                 if (rc == 0)
2011                         RETURN(0);
2012         }
2013
2014         cli = aa->aa_cli;
2015         client_obd_list_lock(&cli->cl_loi_list_lock);
2016         /* We need to decrement before osc_ap_completion->osc_wake_cache_waiters
2017          * is called so we know whether to go to sync BRWs or wait for more
2018          * RPCs to complete */
2019         if (lustre_msg_get_opc(request->rq_reqmsg) == OST_WRITE)
2020                 cli->cl_w_in_flight--;
2021         else
2022                 cli->cl_r_in_flight--;
2023
2024         if (!list_empty(&aa->aa_oaps)) { /* from osc_send_oap_rpc() */
2025                 struct osc_async_page *oap, *tmp;
2026                 /* the caller may re-use the oap after the completion call so
2027                  * we need to clean it up a little */
2028                 list_for_each_entry_safe(oap, tmp, &aa->aa_oaps, oap_rpc_item) {
2029                         list_del_init(&oap->oap_rpc_item);
2030                         osc_ap_completion(cli, aa->aa_oa, oap, 1, rc);
2031                 }
2032                 OBDO_FREE(aa->aa_oa);
2033         } else { /* from async_internal() */
2034                 int i;
2035                 for (i = 0; i < aa->aa_page_count; i++)
2036                         osc_release_write_grant(aa->aa_cli, aa->aa_ppga[i], 1);
2037         }
2038         osc_wake_cache_waiters(cli);
2039         osc_check_rpcs(cli);
2040         client_obd_list_unlock(&cli->cl_loi_list_lock);
2041
2042         osc_release_ppga(aa->aa_ppga, aa->aa_page_count);
2043         RETURN(rc);
2044 }
2045
2046 static struct ptlrpc_request *osc_build_req(struct client_obd *cli,
2047                                             struct list_head *rpc_list,
2048                                             int page_count, int cmd)
2049 {
2050         struct ptlrpc_request *req;
2051         struct brw_page **pga = NULL;
2052         struct osc_brw_async_args *aa;
2053         struct obdo *oa = NULL;
2054         struct obd_async_page_ops *ops = NULL;
2055         void *caller_data = NULL;
2056         struct osc_async_page *oap;
2057         struct ldlm_lock *lock = NULL;
2058         obd_valid valid;
2059         int i, rc;
2060
2061         ENTRY;
2062         LASSERT(!list_empty(rpc_list));
2063
2064         OBD_ALLOC(pga, sizeof(*pga) * page_count);
2065         if (pga == NULL)
2066                 RETURN(ERR_PTR(-ENOMEM));
2067
2068         OBDO_ALLOC(oa);
2069         if (oa == NULL)
2070                 GOTO(out, req = ERR_PTR(-ENOMEM));
2071
2072         i = 0;
2073         list_for_each_entry(oap, rpc_list, oap_rpc_item) {
2074                 if (ops == NULL) {
2075                         ops = oap->oap_caller_ops;
2076                         caller_data = oap->oap_caller_data;
2077                         lock = oap->oap_ldlm_lock;
2078                 }
2079                 pga[i] = &oap->oap_brw_page;
2080                 pga[i]->off = oap->oap_obj_off + oap->oap_page_off;
2081                 CDEBUG(0, "put page %p index %lu oap %p flg %x to pga\n",
2082                        pga[i]->pg, cfs_page_index(oap->oap_page), oap, pga[i]->flag);
2083                 i++;
2084         }
2085
2086         /* always get the data for the obdo for the rpc */
2087         LASSERT(ops != NULL);
2088         ops->ap_fill_obdo(caller_data, cmd, oa);
2089         if (lock) {
2090                 oa->o_handle = lock->l_remote_handle;
2091                 oa->o_valid |= OBD_MD_FLHANDLE;
2092         }
2093
2094         sort_brw_pages(pga, page_count);
2095         rc = osc_brw_prep_request(cmd, cli, oa, NULL, page_count, pga, &req);
2096         if (rc != 0) {
2097                 CERROR("prep_req failed: %d\n", rc);
2098                 GOTO(out, req = ERR_PTR(rc));
2099         }
2100         oa = &((struct ost_body *)lustre_msg_buf(req->rq_reqmsg, REQ_REC_OFF,
2101                                                  sizeof(struct ost_body)))->oa;
2102
2103         /* Need to update the timestamps after the request is built in case
2104          * we race with setattr (locally or in queue at OST).  If OST gets
2105          * later setattr before earlier BRW (as determined by the request xid),
2106          * the OST will not use BRW timestamps.  Sadly, there is no obvious
2107          * way to do this in a single call.  bug 10150 */
2108         if (pga[0]->flag & OBD_BRW_SRVLOCK) {
2109                 /* in case of lockless read/write do not use inode's
2110                  * timestamps because concurrent stat might fill the
2111                  * inode with out-of-date times, send current
2112                  * instead */
2113                 if (cmd & OBD_BRW_WRITE) {
2114                         oa->o_mtime = oa->o_ctime = LTIME_S(CURRENT_TIME);
2115                         oa->o_valid |= OBD_MD_FLMTIME | OBD_MD_FLCTIME;
2116                         valid = OBD_MD_FLATIME;
2117                 } else {
2118                         oa->o_atime = LTIME_S(CURRENT_TIME);
2119                         oa->o_valid |= OBD_MD_FLATIME;
2120                         valid = OBD_MD_FLMTIME | OBD_MD_FLCTIME;
2121                 }
2122         } else {
2123                 valid = OBD_MD_FLMTIME | OBD_MD_FLCTIME | OBD_MD_FLATIME;
2124         }
2125         ops->ap_update_obdo(caller_data, cmd, oa, valid);
2126
2127         CLASSERT(sizeof(*aa) <= sizeof(req->rq_async_args));
2128         aa = ptlrpc_req_async_args(req);
2129         CFS_INIT_LIST_HEAD(&aa->aa_oaps);
2130         list_splice(rpc_list, &aa->aa_oaps);
2131         CFS_INIT_LIST_HEAD(rpc_list);
2132
2133 out:
2134         if (IS_ERR(req)) {
2135                 if (oa)
2136                         OBDO_FREE(oa);
2137                 if (pga)
2138                         OBD_FREE(pga, sizeof(*pga) * page_count);
2139         }
2140         RETURN(req);
2141 }
2142
2143 /* the loi lock is held across this function but it's allowed to release
2144  * and reacquire it during its work */
2145 /**
2146  * prepare pages for ASYNC io and put pages in send queue.
2147  *
2148  * \param cli -
2149  * \param loi -
2150  * \param cmd - OBD_BRW_* macroses
2151  * \param lop - pending pages
2152  *
2153  * \return zero if pages successfully add to send queue.
2154  * \return not zere if error occurring.
2155  */
2156 static int osc_send_oap_rpc(struct client_obd *cli, struct lov_oinfo *loi,
2157                             int cmd, struct loi_oap_pages *lop)
2158 {
2159         struct ptlrpc_request *req;
2160         obd_count page_count = 0;
2161         struct osc_async_page *oap = NULL, *tmp;
2162         struct osc_brw_async_args *aa;
2163         struct obd_async_page_ops *ops;
2164         CFS_LIST_HEAD(rpc_list);
2165         unsigned int ending_offset;
2166         unsigned  starting_offset = 0;
2167         int srvlock = 0;
2168         ENTRY;
2169
2170         /* If there are HP OAPs we need to handle at least 1 of them,
2171          * move it the beginning of the pending list for that. */
2172         if (!list_empty(&lop->lop_urgent)) {
2173                 oap = list_entry(lop->lop_urgent.next,
2174                                  struct osc_async_page, oap_urgent_item);
2175                 if (oap->oap_async_flags & ASYNC_HP)
2176                         list_move(&oap->oap_pending_item, &lop->lop_pending);
2177         }
2178
2179         /* first we find the pages we're allowed to work with */
2180         list_for_each_entry_safe(oap, tmp, &lop->lop_pending, oap_pending_item){
2181                 ops = oap->oap_caller_ops;
2182
2183                 LASSERT(oap->oap_magic == OAP_MAGIC);
2184
2185                 if (page_count != 0 &&
2186                     srvlock != !!(oap->oap_brw_flags & OBD_BRW_SRVLOCK)) {
2187                         CDEBUG(D_PAGE, "SRVLOCK flag mismatch,"
2188                                " oap %p, page %p, srvlock %u\n",
2189                                oap, oap->oap_brw_page.pg, (unsigned)!srvlock);
2190                         break;
2191                 }
2192                 /* in llite being 'ready' equates to the page being locked
2193                  * until completion unlocks it.  commit_write submits a page
2194                  * as not ready because its unlock will happen unconditionally
2195                  * as the call returns.  if we race with commit_write giving
2196                  * us that page we dont' want to create a hole in the page
2197                  * stream, so we stop and leave the rpc to be fired by
2198                  * another dirtier or kupdated interval (the not ready page
2199                  * will still be on the dirty list).  we could call in
2200                  * at the end of ll_file_write to process the queue again. */
2201                 if (!(oap->oap_async_flags & ASYNC_READY)) {
2202                         int rc = ops->ap_make_ready(oap->oap_caller_data, cmd);
2203                         if (rc < 0)
2204                                 CDEBUG(D_INODE, "oap %p page %p returned %d "
2205                                                 "instead of ready\n", oap,
2206                                                 oap->oap_page, rc);
2207                         switch (rc) {
2208                         case -EAGAIN:
2209                                 /* llite is telling us that the page is still
2210                                  * in commit_write and that we should try
2211                                  * and put it in an rpc again later.  we
2212                                  * break out of the loop so we don't create
2213                                  * a hole in the sequence of pages in the rpc
2214                                  * stream.*/
2215                                 oap = NULL;
2216                                 break;
2217                         case -EINTR:
2218                                 /* the io isn't needed.. tell the checks
2219                                  * below to complete the rpc with EINTR */
2220                                 oap->oap_async_flags |= ASYNC_COUNT_STABLE;
2221                                 oap->oap_count = -EINTR;
2222                                 break;
2223                         case 0:
2224                                 oap->oap_async_flags |= ASYNC_READY;
2225                                 break;
2226                         default:
2227                                 LASSERTF(0, "oap %p page %p returned %d "
2228                                             "from make_ready\n", oap,
2229                                             oap->oap_page, rc);
2230                                 break;
2231                         }
2232                 }
2233                 if (oap == NULL)
2234                         break;
2235                 /*
2236                  * Page submitted for IO has to be locked. Either by
2237                  * ->ap_make_ready() or by higher layers.
2238                  */
2239 #if defined(__KERNEL__) && defined(__linux__)
2240                  if(!(PageLocked(oap->oap_page) &&
2241                      (CheckWriteback(oap->oap_page, cmd) || oap->oap_oig !=NULL))) {
2242                         CDEBUG(D_PAGE, "page %p lost wb %lx/%x\n",
2243                                oap->oap_page, (long)oap->oap_page->flags, oap->oap_async_flags);
2244                         LBUG();
2245                 }
2246 #endif
2247                 /* If there is a gap at the start of this page, it can't merge
2248                  * with any previous page, so we'll hand the network a
2249                  * "fragmented" page array that it can't transfer in 1 RDMA */
2250                 if (page_count != 0 && oap->oap_page_off != 0)
2251                         break;
2252
2253                 /* take the page out of our book-keeping */
2254                 list_del_init(&oap->oap_pending_item);
2255                 lop_update_pending(cli, lop, cmd, -1);
2256                 list_del_init(&oap->oap_urgent_item);
2257
2258                 if (page_count == 0)
2259                         starting_offset = (oap->oap_obj_off+oap->oap_page_off) &
2260                                           (PTLRPC_MAX_BRW_SIZE - 1);
2261
2262                 /* ask the caller for the size of the io as the rpc leaves. */
2263                 if (!(oap->oap_async_flags & ASYNC_COUNT_STABLE))
2264                         oap->oap_count =
2265                                 ops->ap_refresh_count(oap->oap_caller_data,cmd);
2266                 if (oap->oap_count <= 0) {
2267                         CDEBUG(D_CACHE, "oap %p count %d, completing\n", oap,
2268                                oap->oap_count);
2269                         osc_ap_completion(cli, NULL, oap, 0, oap->oap_count);
2270                         continue;
2271                 }
2272
2273                 /* now put the page back in our accounting */
2274                 list_add_tail(&oap->oap_rpc_item, &rpc_list);
2275                 if (page_count == 0)
2276                         srvlock = !!(oap->oap_brw_flags & OBD_BRW_SRVLOCK);
2277                 if (++page_count >= cli->cl_max_pages_per_rpc)
2278                         break;
2279
2280                 /* End on a PTLRPC_MAX_BRW_SIZE boundary.  We want full-sized
2281                  * RPCs aligned on PTLRPC_MAX_BRW_SIZE boundaries to help reads
2282                  * have the same alignment as the initial writes that allocated
2283                  * extents on the server. */
2284                 ending_offset = (oap->oap_obj_off + oap->oap_page_off +
2285                                  oap->oap_count) & (PTLRPC_MAX_BRW_SIZE - 1);
2286                 if (ending_offset == 0)
2287                         break;
2288
2289                 /* If there is a gap at the end of this page, it can't merge
2290                  * with any subsequent pages, so we'll hand the network a
2291                  * "fragmented" page array that it can't transfer in 1 RDMA */
2292                 if (oap->oap_page_off + oap->oap_count < CFS_PAGE_SIZE)
2293                         break;
2294         }
2295
2296         osc_wake_cache_waiters(cli);
2297
2298         if (page_count == 0)
2299                 RETURN(0);
2300
2301         loi_list_maint(cli, loi);
2302
2303         client_obd_list_unlock(&cli->cl_loi_list_lock);
2304
2305         req = osc_build_req(cli, &rpc_list, page_count, cmd);
2306         if (IS_ERR(req)) {
2307                 /* this should happen rarely and is pretty bad, it makes the
2308                  * pending list not follow the dirty order */
2309                 client_obd_list_lock(&cli->cl_loi_list_lock);
2310                 list_for_each_entry_safe(oap, tmp, &rpc_list, oap_rpc_item) {
2311                         list_del_init(&oap->oap_rpc_item);
2312
2313                         /* queued sync pages can be torn down while the pages
2314                          * were between the pending list and the rpc */
2315                         if (oap->oap_interrupted) {
2316                                 CDEBUG(D_INODE, "oap %p interrupted\n", oap);
2317                                 osc_ap_completion(cli, NULL, oap, 0,
2318                                                   oap->oap_count);
2319                                 continue;
2320                         }
2321                         osc_ap_completion(cli, NULL, oap, 0, PTR_ERR(req));
2322                 }
2323                 loi_list_maint(cli, loi);
2324                 RETURN(PTR_ERR(req));
2325         }
2326
2327         aa = ptlrpc_req_async_args(req);
2328         if (cmd == OBD_BRW_READ) {
2329                 lprocfs_oh_tally_log2(&cli->cl_read_page_hist, page_count);
2330                 lprocfs_oh_tally(&cli->cl_read_rpc_hist, cli->cl_r_in_flight);
2331                 lprocfs_oh_tally_log2(&cli->cl_read_offset_hist,
2332                                       (starting_offset >> CFS_PAGE_SHIFT) + 1);
2333         } else {
2334                 lprocfs_oh_tally_log2(&cli->cl_write_page_hist, page_count);
2335                 lprocfs_oh_tally(&cli->cl_write_rpc_hist,
2336                                  cli->cl_w_in_flight);
2337                 lprocfs_oh_tally_log2(&cli->cl_write_offset_hist,
2338                                       (starting_offset >> CFS_PAGE_SHIFT) + 1);
2339         }
2340         ptlrpc_lprocfs_brw(req, aa->aa_requested_nob);
2341
2342         client_obd_list_lock(&cli->cl_loi_list_lock);
2343
2344         if (cmd == OBD_BRW_READ)
2345                 cli->cl_r_in_flight++;
2346         else
2347                 cli->cl_w_in_flight++;
2348
2349         /* queued sync pages can be torn down while the pages
2350          * were between the pending list and the rpc */
2351         tmp = NULL;
2352         list_for_each_entry(oap, &aa->aa_oaps, oap_rpc_item) {
2353                 /* only one oap gets a request reference */
2354                 if (tmp == NULL)
2355                         tmp = oap;
2356                 if (oap->oap_interrupted && !req->rq_intr) {
2357                         CDEBUG(D_INODE, "oap %p in req %p interrupted\n",
2358                                oap, req);
2359                         ptlrpc_mark_interrupted(req);
2360                 }
2361         }
2362         if (tmp != NULL)
2363                 tmp->oap_request = ptlrpc_request_addref(req);
2364
2365         DEBUG_REQ(D_INODE, req, "%d pages, aa %p. now %dr/%dw in flight",
2366                   page_count, aa, cli->cl_r_in_flight, cli->cl_w_in_flight);
2367
2368         req->rq_interpret_reply = brw_interpret;
2369         ptlrpcd_add_req(req);
2370         RETURN(1);
2371 }
2372
2373 #define LOI_DEBUG(LOI, STR, args...)                                     \
2374         CDEBUG(D_INODE, "loi ready %d wr %d:%d rd %d:%d " STR,           \
2375                !list_empty(&(LOI)->loi_ready_item) ||                    \
2376                !list_empty(&(LOI)->loi_hp_ready_item),                   \
2377                (LOI)->loi_write_lop.lop_num_pending,                     \
2378                !list_empty(&(LOI)->loi_write_lop.lop_urgent),            \
2379                (LOI)->loi_read_lop.lop_num_pending,                      \
2380                !list_empty(&(LOI)->loi_read_lop.lop_urgent),             \
2381                args)                                                     \
2382
2383 /* This is called by osc_check_rpcs() to find which objects have pages that
2384  * we could be sending.  These lists are maintained by lop_makes_rpc(). */
2385 struct lov_oinfo *osc_next_loi(struct client_obd *cli)
2386 {
2387         ENTRY;
2388         /* First return objects that have blocked locks so that they
2389          * will be flushed quickly and other clients can get the lock,
2390          * then objects which have pages ready to be stuffed into RPCs */
2391         if (!list_empty(&cli->cl_loi_hp_ready_list))
2392                 RETURN(list_entry(cli->cl_loi_hp_ready_list.next,
2393                                   struct lov_oinfo, loi_hp_ready_item));
2394         if (!list_empty(&cli->cl_loi_ready_list))
2395                 RETURN(list_entry(cli->cl_loi_ready_list.next,
2396                                   struct lov_oinfo, loi_ready_item));
2397
2398         /* then if we have cache waiters, return all objects with queued
2399          * writes.  This is especially important when many small files
2400          * have filled up the cache and not been fired into rpcs because
2401          * they don't pass the nr_pending/object threshhold */
2402         if (!list_empty(&cli->cl_cache_waiters) &&
2403             !list_empty(&cli->cl_loi_write_list))
2404                 RETURN(list_entry(cli->cl_loi_write_list.next,
2405                                   struct lov_oinfo, loi_write_item));
2406
2407         /* then return all queued objects when we have an invalid import
2408          * so that they get flushed */
2409         if (cli->cl_import == NULL || cli->cl_import->imp_invalid) {
2410                 if (!list_empty(&cli->cl_loi_write_list))
2411                         RETURN(list_entry(cli->cl_loi_write_list.next,
2412                                           struct lov_oinfo, loi_write_item));
2413                 if (!list_empty(&cli->cl_loi_read_list))
2414                         RETURN(list_entry(cli->cl_loi_read_list.next,
2415                                           struct lov_oinfo, loi_read_item));
2416         }
2417         RETURN(NULL);
2418 }
2419
2420 static int osc_max_rpc_in_flight(struct client_obd *cli, struct lov_oinfo *loi)
2421 {
2422         struct osc_async_page *oap;
2423         int hprpc = 0;
2424
2425         if (!list_empty(&loi->loi_write_lop.lop_urgent)) {
2426                 oap = list_entry(loi->loi_write_lop.lop_urgent.next,
2427                                  struct osc_async_page, oap_urgent_item);
2428                 hprpc = !!(oap->oap_async_flags & ASYNC_HP);
2429         }
2430
2431         if (!hprpc && !list_empty(&loi->loi_read_lop.lop_urgent)) {
2432                 oap = list_entry(loi->loi_write_lop.lop_urgent.next,
2433                                  struct osc_async_page, oap_urgent_item);
2434                 hprpc = !!(oap->oap_async_flags & ASYNC_HP);
2435         }
2436
2437         return rpcs_in_flight(cli) >= cli->cl_max_rpcs_in_flight + hprpc;
2438 }
2439
2440 /* called with the loi list lock held */
2441 static void osc_check_rpcs(struct client_obd *cli)
2442 {
2443         struct lov_oinfo *loi;
2444         int rc = 0, race_counter = 0;
2445         ENTRY;
2446
2447         while ((loi = osc_next_loi(cli)) != NULL) {
2448                 LOI_DEBUG(loi, "%lu in flight\n", rpcs_in_flight(cli));
2449
2450                 if (osc_max_rpc_in_flight(cli, loi))
2451                         break;
2452
2453                 /* attempt some read/write balancing by alternating between
2454                  * reads and writes in an object.  The makes_rpc checks here
2455                  * would be redundant if we were getting read/write work items
2456                  * instead of objects.  we don't want send_oap_rpc to drain a
2457                  * partial read pending queue when we're given this object to
2458                  * do io on writes while there are cache waiters */
2459                 if (lop_makes_rpc(cli, &loi->loi_write_lop, OBD_BRW_WRITE)) {
2460                         rc = osc_send_oap_rpc(cli, loi, OBD_BRW_WRITE,
2461                                               &loi->loi_write_lop);
2462                         if (rc < 0)
2463                                 break;
2464                         if (rc > 0)
2465                                 race_counter = 0;
2466                         else
2467                                 race_counter++;
2468                 }
2469                 if (lop_makes_rpc(cli, &loi->loi_read_lop, OBD_BRW_READ)) {
2470                         rc = osc_send_oap_rpc(cli, loi, OBD_BRW_READ,
2471                                               &loi->loi_read_lop);
2472                         if (rc < 0)
2473                                 break;
2474                         if (rc > 0)
2475                                 race_counter = 0;
2476                         else
2477                                 race_counter++;
2478                 }
2479
2480                 /* attempt some inter-object balancing by issueing rpcs
2481                  * for each object in turn */
2482                 if (!list_empty(&loi->loi_hp_ready_item))
2483                         list_del_init(&loi->loi_hp_ready_item);
2484                 if (!list_empty(&loi->loi_ready_item))
2485                         list_del_init(&loi->loi_ready_item);
2486                 if (!list_empty(&loi->loi_write_item))
2487                         list_del_init(&loi->loi_write_item);
2488                 if (!list_empty(&loi->loi_read_item))
2489                         list_del_init(&loi->loi_read_item);
2490
2491                 loi_list_maint(cli, loi);
2492
2493                 /* send_oap_rpc fails with 0 when make_ready tells it to
2494                  * back off.  llite's make_ready does this when it tries
2495                  * to lock a page queued for write that is already locked.
2496                  * we want to try sending rpcs from many objects, but we
2497                  * don't want to spin failing with 0.  */
2498                 if (race_counter == 10)
2499                         break;
2500         }
2501         EXIT;
2502 }
2503
2504 /* we're trying to queue a page in the osc so we're subject to the
2505  * 'cl_dirty_max' limit on the number of pages that can be queued in the osc.
2506  * If the osc's queued pages are already at that limit, then we want to sleep
2507  * until there is space in the osc's queue for us.  We also may be waiting for
2508  * write credits from the OST if there are RPCs in flight that may return some
2509  * before we fall back to sync writes.
2510  *
2511  * We need this know our allocation was granted in the presence of signals */
2512 static int ocw_granted(struct client_obd *cli, struct osc_cache_waiter *ocw)
2513 {
2514         int rc;
2515         ENTRY;
2516         client_obd_list_lock(&cli->cl_loi_list_lock);
2517         rc = list_empty(&ocw->ocw_entry) || rpcs_in_flight(cli) == 0;
2518         client_obd_list_unlock(&cli->cl_loi_list_lock);
2519         RETURN(rc);
2520 };
2521
2522 /* Caller must hold loi_list_lock - we drop/regain it if we need to wait for
2523  * grant or cache space. */
2524 static int osc_enter_cache(struct client_obd *cli, struct lov_oinfo *loi,
2525                            struct osc_async_page *oap)
2526 {
2527         struct osc_cache_waiter ocw;
2528         struct l_wait_info lwi = { 0 };
2529         ENTRY;
2530
2531         CDEBUG(D_CACHE, "dirty: %ld/%d dirty_max: %ld/%d dropped: %lu "
2532                "grant: %lu\n", cli->cl_dirty, atomic_read(&obd_dirty_pages),
2533                cli->cl_dirty_max, obd_max_dirty_pages,
2534                cli->cl_lost_grant, cli->cl_avail_grant);
2535
2536         /* force the caller to try sync io.  this can jump the list
2537          * of queued writes and create a discontiguous rpc stream */
2538         if (cli->cl_dirty_max < CFS_PAGE_SIZE || cli->cl_ar.ar_force_sync ||
2539             loi->loi_ar.ar_force_sync)
2540                 RETURN(-EDQUOT);
2541
2542         /* Hopefully normal case - cache space and write credits available */
2543         if ((cli->cl_dirty + CFS_PAGE_SIZE <= cli->cl_dirty_max) &&
2544             (atomic_read(&obd_dirty_pages) + 1 <= obd_max_dirty_pages) &&
2545             (cli->cl_avail_grant >= CFS_PAGE_SIZE)) {
2546                 /* account for ourselves */
2547                 osc_consume_write_grant(cli, &oap->oap_brw_page);
2548                 RETURN(0);
2549         }
2550
2551         /* Make sure that there are write rpcs in flight to wait for.  This
2552          * is a little silly as this object may not have any pending but
2553          * other objects sure might. */
2554         if (cli->cl_w_in_flight) {
2555                 list_add_tail(&ocw.ocw_entry, &cli->cl_cache_waiters);
2556                 cfs_waitq_init(&ocw.ocw_waitq);
2557                 ocw.ocw_oap = oap;
2558                 ocw.ocw_rc = 0;
2559
2560                 loi_list_maint(cli, loi);
2561                 osc_check_rpcs(cli);
2562                 client_obd_list_unlock(&cli->cl_loi_list_lock);
2563
2564                 CDEBUG(D_CACHE, "sleeping for cache space\n");
2565                 l_wait_event(ocw.ocw_waitq, ocw_granted(cli, &ocw), &lwi);
2566
2567                 client_obd_list_lock(&cli->cl_loi_list_lock);
2568                 if (!list_empty(&ocw.ocw_entry)) {
2569                         list_del(&ocw.ocw_entry);
2570                         RETURN(-EINTR);
2571                 }
2572                 RETURN(ocw.ocw_rc);
2573         }
2574
2575         RETURN(-EDQUOT);
2576 }
2577
2578 static int osc_reget_short_lock(struct obd_export *exp,
2579                                 struct lov_stripe_md *lsm,
2580                                 void **res, int rw,
2581                                 obd_off start, obd_off end,
2582                                 void **cookie)
2583 {
2584         struct osc_async_page *oap = *res;
2585         int rc;
2586
2587         ENTRY;
2588
2589         spin_lock(&oap->oap_lock);
2590         rc = ldlm_lock_fast_match(oap->oap_ldlm_lock, rw,
2591                                   start, end, cookie);
2592         spin_unlock(&oap->oap_lock);
2593
2594         RETURN(rc);
2595 }
2596
2597 static int osc_release_short_lock(struct obd_export *exp,
2598                                   struct lov_stripe_md *lsm, obd_off end,
2599                                   void *cookie, int rw)
2600 {
2601         ENTRY;
2602         ldlm_lock_fast_release(cookie, rw);
2603         /* no error could have happened at this layer */
2604         RETURN(0);
2605 }
2606
2607 int osc_prep_async_page(struct obd_export *exp, struct lov_stripe_md *lsm,
2608                         struct lov_oinfo *loi, cfs_page_t *page,
2609                         obd_off offset, struct obd_async_page_ops *ops,
2610                         void *data, void **res, int nocache,
2611                         struct lustre_handle *lockh)
2612 {
2613         struct osc_async_page *oap;
2614         struct ldlm_res_id oid = {{0}};
2615         int rc = 0;
2616
2617         ENTRY;
2618
2619         if (!page)
2620                 return size_round(sizeof(*oap));
2621
2622         oap = *res;
2623         oap->oap_magic = OAP_MAGIC;
2624         oap->oap_cli = &exp->exp_obd->u.cli;
2625         oap->oap_loi = loi;
2626
2627         oap->oap_caller_ops = ops;
2628         oap->oap_caller_data = data;
2629
2630         oap->oap_page = page;
2631         oap->oap_obj_off = offset;
2632
2633         CFS_INIT_LIST_HEAD(&oap->oap_pending_item);
2634         CFS_INIT_LIST_HEAD(&oap->oap_urgent_item);
2635         CFS_INIT_LIST_HEAD(&oap->oap_rpc_item);
2636         CFS_INIT_LIST_HEAD(&oap->oap_page_list);
2637
2638         oap->oap_occ.occ_interrupted = osc_occ_interrupted;
2639
2640         spin_lock_init(&oap->oap_lock);
2641
2642         /* If the page was marked as notcacheable - don't add to any locks */
2643         if (!nocache) {
2644                 osc_build_res_name(loi->loi_id, loi->loi_gr, &oid);
2645                 /* This is the only place where we can call cache_add_extent
2646                    without oap_lock, because this page is locked now, and
2647                    the lock we are adding it to is referenced, so cannot lose
2648                    any pages either. */
2649                 rc = cache_add_extent(oap->oap_cli->cl_cache, &oid, oap, lockh);
2650                 if (rc)
2651                         RETURN(rc);
2652         }
2653
2654         CDEBUG(D_CACHE, "oap %p page %p obj off "LPU64"\n", oap, page, offset);
2655         RETURN(0);
2656 }
2657
2658 struct osc_async_page *oap_from_cookie(void *cookie)
2659 {
2660         struct osc_async_page *oap = cookie;
2661         if (oap->oap_magic != OAP_MAGIC)
2662                 return ERR_PTR(-EINVAL);
2663         return oap;
2664 };
2665
2666 static int osc_queue_async_io(struct obd_export *exp, struct lov_stripe_md *lsm,
2667                               struct lov_oinfo *loi, void *cookie,
2668                               int cmd, obd_off off, int count,
2669                               obd_flag brw_flags, enum async_flags async_flags)
2670 {
2671         struct client_obd *cli = &exp->exp_obd->u.cli;
2672         struct osc_async_page *oap;
2673         int rc = 0;
2674         ENTRY;
2675
2676         oap = oap_from_cookie(cookie);
2677         if (IS_ERR(oap))
2678                 RETURN(PTR_ERR(oap));
2679
2680         if (cli->cl_import == NULL || cli->cl_import->imp_invalid)
2681                 RETURN(-EIO);
2682
2683         if (!list_empty(&oap->oap_pending_item) ||
2684             !list_empty(&oap->oap_urgent_item) ||
2685             !list_empty(&oap->oap_rpc_item))
2686                 RETURN(-EBUSY);
2687
2688         /* check if the file's owner/group is over quota */
2689         if ((cmd & OBD_BRW_WRITE) && !(cmd & OBD_BRW_NOQUOTA)){
2690                 struct obd_async_page_ops *ops;
2691                 struct obdo *oa;
2692
2693                 OBDO_ALLOC(oa);
2694                 if (oa == NULL)
2695                         RETURN(-ENOMEM);
2696
2697                 ops = oap->oap_caller_ops;
2698                 ops->ap_fill_obdo(oap->oap_caller_data, cmd, oa);
2699                 if (lquota_chkdq(quota_interface, cli, oa->o_uid, oa->o_gid) ==
2700                     NO_QUOTA)
2701                         rc = -EDQUOT;
2702
2703                 OBDO_FREE(oa);
2704                 if (rc)
2705                         RETURN(rc);
2706         }
2707
2708         if (loi == NULL)
2709                 loi = lsm->lsm_oinfo[0];
2710
2711         client_obd_list_lock(&cli->cl_loi_list_lock);
2712
2713         oap->oap_cmd = cmd;
2714         oap->oap_page_off = off;
2715         oap->oap_count = count;
2716         oap->oap_brw_flags = brw_flags;
2717         oap->oap_async_flags = async_flags;
2718
2719         if (cmd & OBD_BRW_WRITE) {
2720                 rc = osc_enter_cache(cli, loi, oap);
2721                 if (rc) {
2722                         client_obd_list_unlock(&cli->cl_loi_list_lock);
2723                         RETURN(rc);
2724                 }
2725         }
2726
2727         osc_oap_to_pending(oap);
2728         loi_list_maint(cli, loi);
2729
2730         LOI_DEBUG(loi, "oap %p page %p added for cmd %d\n", oap, oap->oap_page,
2731                   cmd);
2732
2733         osc_check_rpcs(cli);
2734         client_obd_list_unlock(&cli->cl_loi_list_lock);
2735
2736         RETURN(0);
2737 }
2738
2739 /* aka (~was & now & flag), but this is more clear :) */
2740 #define SETTING(was, now, flag) (!(was & flag) && (now & flag))
2741
2742 static int osc_set_async_flags(struct obd_export *exp,
2743                                struct lov_stripe_md *lsm,
2744                                struct lov_oinfo *loi, void *cookie,
2745                                obd_flag async_flags)
2746 {
2747         struct client_obd *cli = &exp->exp_obd->u.cli;
2748         struct loi_oap_pages *lop;
2749         struct osc_async_page *oap;
2750         int rc = 0;
2751         ENTRY;
2752
2753         oap = oap_from_cookie(cookie);
2754         if (IS_ERR(oap))
2755                 RETURN(PTR_ERR(oap));
2756
2757         /*
2758          * bug 7311: OST-side locking is only supported for liblustre for now
2759          * (and liblustre never calls obd_set_async_flags(). I hope.), generic
2760          * implementation has to handle case where OST-locked page was picked
2761          * up by, e.g., ->writepage().
2762          */
2763         LASSERT(!(oap->oap_brw_flags & OBD_BRW_SRVLOCK));
2764         LASSERT(!LIBLUSTRE_CLIENT); /* check that liblustre angels do fear to
2765                                      * tread here. */
2766
2767         if (cli->cl_import == NULL || cli->cl_import->imp_invalid)
2768                 RETURN(-EIO);
2769
2770         if (loi == NULL)
2771                 loi = lsm->lsm_oinfo[0];
2772
2773         if (oap->oap_cmd & OBD_BRW_WRITE) {
2774                 lop = &loi->loi_write_lop;
2775         } else {
2776                 lop = &loi->loi_read_lop;
2777         }
2778
2779         client_obd_list_lock(&cli->cl_loi_list_lock);
2780
2781         if (list_empty(&oap->oap_pending_item))
2782                 GOTO(out, rc = -EINVAL);
2783
2784         if ((oap->oap_async_flags & async_flags) == async_flags)
2785                 GOTO(out, rc = 0);
2786
2787         if (SETTING(oap->oap_async_flags, async_flags, ASYNC_READY))
2788                 oap->oap_async_flags |= ASYNC_READY;
2789
2790         if (SETTING(oap->oap_async_flags, async_flags, ASYNC_URGENT) &&
2791             list_empty(&oap->oap_rpc_item)) {
2792                 if (oap->oap_async_flags & ASYNC_HP)
2793                         list_add(&oap->oap_urgent_item, &lop->lop_urgent);
2794                 else
2795                         list_add_tail(&oap->oap_urgent_item, &lop->lop_urgent);
2796                 oap->oap_async_flags |= ASYNC_URGENT;
2797                 loi_list_maint(cli, loi);
2798         }
2799
2800         LOI_DEBUG(loi, "oap %p page %p has flags %x\n", oap, oap->oap_page,
2801                         oap->oap_async_flags);
2802 out:
2803         osc_check_rpcs(cli);
2804         client_obd_list_unlock(&cli->cl_loi_list_lock);
2805         RETURN(rc);
2806 }
2807
2808 static int osc_queue_group_io(struct obd_export *exp, struct lov_stripe_md *lsm,
2809                              struct lov_oinfo *loi,
2810                              struct obd_io_group *oig, void *cookie,
2811                              int cmd, obd_off off, int count,
2812                              obd_flag brw_flags,
2813                              obd_flag async_flags)
2814 {
2815         struct client_obd *cli = &exp->exp_obd->u.cli;
2816         struct osc_async_page *oap;
2817         struct loi_oap_pages *lop;
2818         int rc = 0;
2819         ENTRY;
2820
2821         oap = oap_from_cookie(cookie);
2822         if (IS_ERR(oap))
2823                 RETURN(PTR_ERR(oap));
2824
2825         if (cli->cl_import == NULL || cli->cl_import->imp_invalid)
2826                 RETURN(-EIO);
2827
2828         if (!list_empty(&oap->oap_pending_item) ||
2829             !list_empty(&oap->oap_urgent_item) ||
2830             !list_empty(&oap->oap_rpc_item))
2831                 RETURN(-EBUSY);
2832
2833         if (loi == NULL)
2834                 loi = lsm->lsm_oinfo[0];
2835
2836         client_obd_list_lock(&cli->cl_loi_list_lock);
2837
2838         oap->oap_cmd = cmd;
2839         oap->oap_page_off = off;
2840         oap->oap_count = count;
2841         oap->oap_brw_flags = brw_flags;
2842         oap->oap_async_flags = async_flags;
2843
2844         if (cmd & OBD_BRW_WRITE)
2845                 lop = &loi->loi_write_lop;
2846         else
2847                 lop = &loi->loi_read_lop;
2848
2849         list_add_tail(&oap->oap_pending_item, &lop->lop_pending_group);
2850         if (oap->oap_async_flags & ASYNC_GROUP_SYNC) {
2851                 oap->oap_oig = oig;
2852                 rc = oig_add_one(oig, &oap->oap_occ);
2853         }
2854
2855         LOI_DEBUG(loi, "oap %p page %p on group pending: rc %d\n",
2856                   oap, oap->oap_page, rc);
2857
2858         client_obd_list_unlock(&cli->cl_loi_list_lock);
2859
2860         RETURN(rc);
2861 }
2862
2863 static void osc_group_to_pending(struct client_obd *cli, struct lov_oinfo *loi,
2864                                  struct loi_oap_pages *lop, int cmd)
2865 {
2866         struct list_head *pos, *tmp;
2867         struct osc_async_page *oap;
2868
2869         list_for_each_safe(pos, tmp, &lop->lop_pending_group) {
2870                 oap = list_entry(pos, struct osc_async_page, oap_pending_item);
2871                 list_del(&oap->oap_pending_item);
2872                 osc_oap_to_pending(oap);
2873         }
2874         loi_list_maint(cli, loi);
2875 }
2876
2877 static int osc_trigger_group_io(struct obd_export *exp,
2878                                 struct lov_stripe_md *lsm,
2879                                 struct lov_oinfo *loi,
2880                                 struct obd_io_group *oig)
2881 {
2882         struct client_obd *cli = &exp->exp_obd->u.cli;
2883         ENTRY;
2884
2885         if (loi == NULL)
2886                 loi = lsm->lsm_oinfo[0];
2887
2888         client_obd_list_lock(&cli->cl_loi_list_lock);
2889
2890         osc_group_to_pending(cli, loi, &loi->loi_write_lop, OBD_BRW_WRITE);
2891         osc_group_to_pending(cli, loi, &loi->loi_read_lop, OBD_BRW_READ);
2892
2893         osc_check_rpcs(cli);
2894         client_obd_list_unlock(&cli->cl_loi_list_lock);
2895
2896         RETURN(0);
2897 }
2898
2899 static int osc_teardown_async_page(struct obd_export *exp,
2900                                    struct lov_stripe_md *lsm,
2901                                    struct lov_oinfo *loi, void *cookie)
2902 {
2903         struct client_obd *cli = &exp->exp_obd->u.cli;
2904         struct loi_oap_pages *lop;
2905         struct osc_async_page *oap;
2906         int rc = 0;
2907         ENTRY;
2908
2909         oap = oap_from_cookie(cookie);
2910         if (IS_ERR(oap))
2911                 RETURN(PTR_ERR(oap));
2912
2913         if (loi == NULL)
2914                 loi = lsm->lsm_oinfo[0];
2915
2916         if (oap->oap_cmd & OBD_BRW_WRITE) {
2917                 lop = &loi->loi_write_lop;
2918         } else {
2919                 lop = &loi->loi_read_lop;
2920         }
2921
2922         client_obd_list_lock(&cli->cl_loi_list_lock);
2923
2924         if (!list_empty(&oap->oap_rpc_item))
2925                 GOTO(out, rc = -EBUSY);
2926
2927         osc_exit_cache(cli, oap, 0);
2928         osc_wake_cache_waiters(cli);
2929
2930         if (!list_empty(&oap->oap_urgent_item)) {
2931                 list_del_init(&oap->oap_urgent_item);
2932                 oap->oap_async_flags &= ~(ASYNC_URGENT | ASYNC_HP);
2933         }
2934
2935         if (!list_empty(&oap->oap_pending_item)) {
2936                 list_del_init(&oap->oap_pending_item);
2937                 lop_update_pending(cli, lop, oap->oap_cmd, -1);
2938         }
2939         loi_list_maint(cli, loi);
2940         cache_remove_extent(cli->cl_cache, oap);
2941
2942         LOI_DEBUG(loi, "oap %p page %p torn down\n", oap, oap->oap_page);
2943 out:
2944         client_obd_list_unlock(&cli->cl_loi_list_lock);
2945         RETURN(rc);
2946 }
2947
2948 int osc_extent_blocking_cb(struct ldlm_lock *lock,
2949                            struct ldlm_lock_desc *new, void *data,
2950                            int flag)
2951 {
2952         struct lustre_handle lockh = { 0 };
2953         int rc;
2954         ENTRY;
2955
2956         if ((unsigned long)data > 0 && (unsigned long)data < 0x1000) {
2957                 LDLM_ERROR(lock, "cancelling lock with bad data %p", data);
2958                 LBUG();
2959         }
2960
2961         switch (flag) {
2962         case LDLM_CB_BLOCKING:
2963                 ldlm_lock2handle(lock, &lockh);
2964                 rc = ldlm_cli_cancel(&lockh);
2965                 if (rc != ELDLM_OK)
2966                         CERROR("ldlm_cli_cancel failed: %d\n", rc);
2967                 break;
2968         case LDLM_CB_CANCELING: {
2969
2970                 ldlm_lock2handle(lock, &lockh);
2971                 /* This lock wasn't granted, don't try to do anything */
2972                 if (lock->l_req_mode != lock->l_granted_mode)
2973                         RETURN(0);
2974
2975                 cache_remove_lock(lock->l_conn_export->exp_obd->u.cli.cl_cache,
2976                                   &lockh);
2977
2978                 if (lock->l_conn_export->exp_obd->u.cli.cl_ext_lock_cancel_cb)
2979                         lock->l_conn_export->exp_obd->u.cli.cl_ext_lock_cancel_cb(
2980                                                           lock, new, data,flag);
2981                 break;
2982         }
2983         default:
2984                 LBUG();
2985         }
2986
2987         RETURN(0);
2988 }
2989 EXPORT_SYMBOL(osc_extent_blocking_cb);
2990
2991 static void osc_set_data_with_check(struct lustre_handle *lockh, void *data,
2992                                     int flags)
2993 {
2994         struct ldlm_lock *lock = ldlm_handle2lock(lockh);
2995
2996         if (lock == NULL) {
2997                 CERROR("lockh %p, data %p - client evicted?\n", lockh, data);
2998                 return;
2999         }
3000         lock_res_and_lock(lock);
3001 #if defined (__KERNEL__) && defined (__linux__)
3002         /* Liang XXX: Darwin and Winnt checking should be added */
3003         if (lock->l_ast_data && lock->l_ast_data != data) {
3004                 struct inode *new_inode = data;
3005                 struct inode *old_inode = lock->l_ast_data;
3006                 if (!(old_inode->i_state & I_FREEING))
3007                         LDLM_ERROR(lock, "inconsistent l_ast_data found");
3008                 LASSERTF(old_inode->i_state & I_FREEING,
3009                          "Found existing inode %p/%lu/%u state %lu in lock: "
3010                          "setting data to %p/%lu/%u\n", old_inode,
3011                          old_inode->i_ino, old_inode->i_generation,
3012                          old_inode->i_state,
3013                          new_inode, new_inode->i_ino, new_inode->i_generation);
3014         }
3015 #endif
3016         lock->l_ast_data = data;
3017         lock->l_flags |= (flags & LDLM_FL_NO_LRU);
3018         unlock_res_and_lock(lock);
3019         LDLM_LOCK_PUT(lock);
3020 }
3021
3022 static int osc_change_cbdata(struct obd_export *exp, struct lov_stripe_md *lsm,
3023                              ldlm_iterator_t replace, void *data)
3024 {
3025         struct ldlm_res_id res_id;
3026         struct obd_device *obd = class_exp2obd(exp);
3027
3028         osc_build_res_name(lsm->lsm_object_id, lsm->lsm_object_gr, &res_id);
3029         ldlm_resource_iterate(obd->obd_namespace, &res_id, replace, data);
3030         return 0;
3031 }
3032
3033 static int osc_enqueue_fini(struct obd_device *obd, struct ptlrpc_request *req,
3034                             struct obd_info *oinfo, int intent, int rc)
3035 {
3036         ENTRY;
3037
3038         if (intent) {
3039                 /* The request was created before ldlm_cli_enqueue call. */
3040                 if (rc == ELDLM_LOCK_ABORTED) {
3041                         struct ldlm_reply *rep;
3042
3043                         /* swabbed by ldlm_cli_enqueue() */
3044                         LASSERT(lustre_rep_swabbed(req, DLM_LOCKREPLY_OFF));
3045                         rep = lustre_msg_buf(req->rq_repmsg, DLM_LOCKREPLY_OFF,
3046                                              sizeof(*rep));
3047                         LASSERT(rep != NULL);
3048                         if (rep->lock_policy_res1)
3049                                 rc = rep->lock_policy_res1;
3050                 }
3051         }
3052
3053         if ((intent && rc == ELDLM_LOCK_ABORTED) || !rc) {
3054                 CDEBUG(D_INODE,"got kms "LPU64" blocks "LPU64" mtime "LPU64"\n",
3055                        oinfo->oi_md->lsm_oinfo[0]->loi_lvb.lvb_size,
3056                        oinfo->oi_md->lsm_oinfo[0]->loi_lvb.lvb_blocks,
3057                        oinfo->oi_md->lsm_oinfo[0]->loi_lvb.lvb_mtime);
3058         }
3059
3060         if (!rc)
3061                 cache_add_lock(obd->u.cli.cl_cache, oinfo->oi_lockh);
3062
3063         /* Call the update callback. */
3064         rc = oinfo->oi_cb_up(oinfo, rc);
3065         RETURN(rc);
3066 }
3067
3068 static int osc_enqueue_interpret(struct ptlrpc_request *req,
3069                                  struct osc_enqueue_args *aa, int rc)
3070 {
3071         int intent = aa->oa_oi->oi_flags & LDLM_FL_HAS_INTENT;
3072         struct lov_stripe_md *lsm = aa->oa_oi->oi_md;
3073         struct ldlm_lock *lock;
3074
3075         /* ldlm_cli_enqueue is holding a reference on the lock, so it must
3076          * be valid. */
3077         lock = ldlm_handle2lock(aa->oa_oi->oi_lockh);
3078
3079         /* Complete obtaining the lock procedure. */
3080         rc = ldlm_cli_enqueue_fini(aa->oa_exp, req, aa->oa_ei->ei_type, 1,
3081                                    aa->oa_ei->ei_mode,
3082                                    &aa->oa_oi->oi_flags,
3083                                    &lsm->lsm_oinfo[0]->loi_lvb,
3084                                    sizeof(lsm->lsm_oinfo[0]->loi_lvb),
3085                                    lustre_swab_ost_lvb,
3086                                    aa->oa_oi->oi_lockh, rc);
3087
3088         /* Complete osc stuff. */
3089         rc = osc_enqueue_fini(aa->oa_exp->exp_obd, req, aa->oa_oi, intent, rc);
3090
3091         /* Release the lock for async request. */
3092         if (lustre_handle_is_used(aa->oa_oi->oi_lockh) && rc == ELDLM_OK)
3093                 ldlm_lock_decref(aa->oa_oi->oi_lockh, aa->oa_ei->ei_mode);
3094
3095         LASSERTF(lock != NULL, "lockh %p, req %p, aa %p - client evicted?\n",
3096                  aa->oa_oi->oi_lockh, req, aa);
3097         LDLM_LOCK_PUT(lock);
3098         return rc;
3099 }
3100
3101 /* When enqueuing asynchronously, locks are not ordered, we can obtain a lock
3102  * from the 2nd OSC before a lock from the 1st one. This does not deadlock with
3103  * other synchronous requests, however keeping some locks and trying to obtain
3104  * others may take a considerable amount of time in a case of ost failure; and
3105  * when other sync requests do not get released lock from a client, the client
3106  * is excluded from the cluster -- such scenarious make the life difficult, so
3107  * release locks just after they are obtained. */
3108 static int osc_enqueue(struct obd_export *exp, struct obd_info *oinfo,
3109                        struct ldlm_enqueue_info *einfo,
3110                        struct ptlrpc_request_set *rqset)
3111 {
3112         struct ldlm_res_id res_id;
3113         struct obd_device *obd = exp->exp_obd;
3114         struct ldlm_reply *rep;
3115         struct ptlrpc_request *req = NULL;
3116         int intent = oinfo->oi_flags & LDLM_FL_HAS_INTENT;
3117         ldlm_mode_t mode;
3118         int rc;
3119         ENTRY;
3120
3121         osc_build_res_name(oinfo->oi_md->lsm_object_id,
3122                            oinfo->oi_md->lsm_object_gr, &res_id);
3123         /* Filesystem lock extents are extended to page boundaries so that
3124          * dealing with the page cache is a little smoother.  */
3125         oinfo->oi_policy.l_extent.start -=
3126                 oinfo->oi_policy.l_extent.start & ~CFS_PAGE_MASK;
3127         oinfo->oi_policy.l_extent.end |= ~CFS_PAGE_MASK;
3128
3129         if (oinfo->oi_md->lsm_oinfo[0]->loi_kms_valid == 0)
3130                 goto no_match;
3131
3132         /* Next, search for already existing extent locks that will cover us */
3133         /* If we're trying to read, we also search for an existing PW lock.  The
3134          * VFS and page cache already protect us locally, so lots of readers/
3135          * writers can share a single PW lock.
3136          *
3137          * There are problems with conversion deadlocks, so instead of
3138          * converting a read lock to a write lock, we'll just enqueue a new
3139          * one.
3140          *
3141          * At some point we should cancel the read lock instead of making them
3142          * send us a blocking callback, but there are problems with canceling
3143          * locks out from other users right now, too. */
3144         mode = einfo->ei_mode;
3145         if (einfo->ei_mode == LCK_PR)
3146                 mode |= LCK_PW;
3147         mode = ldlm_lock_match(obd->obd_namespace,
3148                                oinfo->oi_flags | LDLM_FL_LVB_READY, &res_id,
3149                                einfo->ei_type, &oinfo->oi_policy, mode,
3150                                oinfo->oi_lockh);
3151         if (mode) {
3152                 /* addref the lock only if not async requests and PW lock is
3153                  * matched whereas we asked for PR. */
3154                 if (!rqset && einfo->ei_mode != mode)
3155                         ldlm_lock_addref(oinfo->oi_lockh, LCK_PR);
3156                 osc_set_data_with_check(oinfo->oi_lockh, einfo->ei_cbdata,
3157                                         oinfo->oi_flags);
3158                 if (intent) {
3159                         /* I would like to be able to ASSERT here that rss <=
3160                          * kms, but I can't, for reasons which are explained in
3161                          * lov_enqueue() */
3162                 }
3163
3164                 /* We already have a lock, and it's referenced */
3165                 oinfo->oi_cb_up(oinfo, ELDLM_OK);
3166
3167                 /* For async requests, decref the lock. */
3168                 if (einfo->ei_mode != mode)
3169                         ldlm_lock_decref(oinfo->oi_lockh, LCK_PW);
3170                 else if (rqset)
3171                         ldlm_lock_decref(oinfo->oi_lockh, einfo->ei_mode);
3172
3173                 RETURN(ELDLM_OK);
3174         }
3175
3176  no_match:
3177         if (intent) {
3178                 __u32 size[3] = {
3179                         [MSG_PTLRPC_BODY_OFF] = sizeof(struct ptlrpc_body),
3180                         [DLM_LOCKREQ_OFF]     = sizeof(struct ldlm_request),
3181                         [DLM_LOCKREQ_OFF + 1] = 0 };
3182
3183                 req = ldlm_prep_enqueue_req(exp, 2, size, NULL, 0);
3184                 if (req == NULL)
3185                         RETURN(-ENOMEM);
3186
3187                 size[DLM_LOCKREPLY_OFF] = sizeof(*rep);
3188                 size[DLM_REPLY_REC_OFF] =
3189                         sizeof(oinfo->oi_md->lsm_oinfo[0]->loi_lvb);
3190                 ptlrpc_req_set_repsize(req, 3, size);
3191         }
3192
3193         /* users of osc_enqueue() can pass this flag for ldlm_lock_match() */
3194         oinfo->oi_flags &= ~LDLM_FL_BLOCK_GRANTED;
3195
3196         rc = ldlm_cli_enqueue(exp, &req, einfo, res_id,
3197                               &oinfo->oi_policy, &oinfo->oi_flags,
3198                               &oinfo->oi_md->lsm_oinfo[0]->loi_lvb,
3199                               sizeof(oinfo->oi_md->lsm_oinfo[0]->loi_lvb),
3200                               lustre_swab_ost_lvb, oinfo->oi_lockh,
3201                               rqset ? 1 : 0);
3202         if (rqset) {
3203                 if (!rc) {
3204                         struct osc_enqueue_args *aa;
3205                         CLASSERT(sizeof(*aa) <= sizeof(req->rq_async_args));
3206                         aa = ptlrpc_req_async_args(req);
3207                         aa->oa_oi = oinfo;
3208                         aa->oa_ei = einfo;
3209                         aa->oa_exp = exp;
3210
3211                         req->rq_interpret_reply = osc_enqueue_interpret;
3212                         ptlrpc_set_add_req(rqset, req);
3213                 } else if (intent) {
3214                         ptlrpc_req_finished(req);
3215                 }
3216                 RETURN(rc);
3217         }
3218
3219         rc = osc_enqueue_fini(obd, req, oinfo, intent, rc);
3220         if (intent)
3221                 ptlrpc_req_finished(req);
3222
3223         RETURN(rc);
3224 }
3225
3226 static int osc_match(struct obd_export *exp, struct lov_stripe_md *lsm,
3227                      __u32 type, ldlm_policy_data_t *policy, __u32 mode,
3228                      int *flags, void *data, struct lustre_handle *lockh,
3229                      int *n_matches)
3230 {
3231         struct ldlm_res_id res_id;
3232         struct obd_device *obd = exp->exp_obd;
3233         int lflags = *flags;
3234         ldlm_mode_t rc;
3235         ENTRY;
3236
3237         osc_build_res_name(lsm->lsm_object_id, lsm->lsm_object_gr, &res_id);
3238
3239         OBD_FAIL_RETURN(OBD_FAIL_OSC_MATCH, -EIO);
3240
3241         /* Filesystem lock extents are extended to page boundaries so that
3242          * dealing with the page cache is a little smoother */
3243         policy->l_extent.start -= policy->l_extent.start & ~CFS_PAGE_MASK;
3244         policy->l_extent.end |= ~CFS_PAGE_MASK;
3245
3246         /* Next, search for already existing extent locks that will cover us */
3247         /* If we're trying to read, we also search for an existing PW lock.  The
3248          * VFS and page cache already protect us locally, so lots of readers/
3249          * writers can share a single PW lock. */
3250         rc = mode;
3251         if (mode == LCK_PR)
3252                 rc |= LCK_PW;
3253         rc = ldlm_lock_match(obd->obd_namespace, lflags | LDLM_FL_LVB_READY,
3254                              &res_id, type, policy, rc, lockh);
3255         if (rc) {
3256                 osc_set_data_with_check(lockh, data, lflags);
3257                 if (!(lflags & LDLM_FL_TEST_LOCK) && mode != rc) {
3258                         ldlm_lock_addref(lockh, LCK_PR);
3259                         ldlm_lock_decref(lockh, LCK_PW);
3260                 }
3261                 if (n_matches != NULL)
3262                         (*n_matches)++;
3263         }
3264
3265         RETURN(rc);
3266 }
3267
3268 static int osc_cancel(struct obd_export *exp, struct lov_stripe_md *md,
3269                       __u32 mode, struct lustre_handle *lockh)
3270 {
3271         ENTRY;
3272
3273         if (unlikely(mode == LCK_GROUP))
3274                 ldlm_lock_decref_and_cancel(lockh, mode);
3275         else
3276                 ldlm_lock_decref(lockh, mode);
3277
3278         RETURN(0);
3279 }
3280
3281 static int osc_cancel_unused(struct obd_export *exp,
3282                              struct lov_stripe_md *lsm, int flags, void *opaque)
3283 {
3284         struct obd_device *obd = class_exp2obd(exp);
3285         struct ldlm_res_id res_id, *resp = NULL;
3286
3287         if (lsm != NULL) {
3288                 resp = osc_build_res_name(lsm->lsm_object_id,
3289                                           lsm->lsm_object_gr, &res_id);
3290         }
3291
3292         return ldlm_cli_cancel_unused(obd->obd_namespace, resp, flags, opaque);
3293
3294 }
3295
3296 static int osc_join_lru(struct obd_export *exp,
3297                         struct lov_stripe_md *lsm, int join)
3298 {
3299         struct obd_device *obd = class_exp2obd(exp);
3300         struct ldlm_res_id res_id, *resp = NULL;
3301
3302         if (lsm != NULL) {
3303                 resp = osc_build_res_name(lsm->lsm_object_id,
3304                                           lsm->lsm_object_gr, &res_id);
3305         }
3306
3307         return ldlm_cli_join_lru(obd->obd_namespace, resp, join);
3308
3309 }
3310
3311 static int osc_statfs_interpret(struct ptlrpc_request *req,
3312                                 struct osc_async_args *aa, int rc)
3313 {
3314         struct obd_statfs *msfs;
3315         ENTRY;
3316
3317         if (rc != 0)
3318                 GOTO(out, rc);
3319
3320         msfs = lustre_swab_repbuf(req, REPLY_REC_OFF, sizeof(*msfs),
3321                                   lustre_swab_obd_statfs);
3322         if (msfs == NULL) {
3323                 CERROR("Can't unpack obd_statfs\n");
3324                 GOTO(out, rc = -EPROTO);
3325         }
3326
3327         memcpy(aa->aa_oi->oi_osfs, msfs, sizeof(*msfs));
3328 out:
3329         rc = aa->aa_oi->oi_cb_up(aa->aa_oi, rc);
3330         RETURN(rc);
3331 }
3332
3333 static int osc_statfs_async(struct obd_device *obd, struct obd_info *oinfo,
3334                             __u64 max_age, struct ptlrpc_request_set *rqset)
3335 {
3336         struct ptlrpc_request *req;
3337         struct osc_async_args *aa;
3338         __u32 size[2] = { sizeof(struct ptlrpc_body), sizeof(*oinfo->oi_osfs) };
3339         ENTRY;
3340
3341         /* We could possibly pass max_age in the request (as an absolute
3342          * timestamp or a "seconds.usec ago") so the target can avoid doing
3343          * extra calls into the filesystem if that isn't necessary (e.g.
3344          * during mount that would help a bit).  Having relative timestamps
3345          * is not so great if request processing is slow, while absolute
3346          * timestamps are not ideal because they need time synchronization. */
3347         req = ptlrpc_prep_req(obd->u.cli.cl_import, LUSTRE_OST_VERSION,
3348                               OST_STATFS, 1, NULL, NULL);
3349         if (!req)
3350                 RETURN(-ENOMEM);
3351
3352         ptlrpc_req_set_repsize(req, 2, size);
3353         req->rq_request_portal = OST_CREATE_PORTAL;
3354         ptlrpc_at_set_req_timeout(req);
3355         if (oinfo->oi_flags & OBD_STATFS_NODELAY) {
3356                 /* procfs requests not want stat in wait for avoid deadlock */
3357                 req->rq_no_resend = 1;
3358                 req->rq_no_delay = 1;
3359         }
3360
3361         req->rq_interpret_reply = osc_statfs_interpret;
3362         CLASSERT(sizeof(*aa) <= sizeof(req->rq_async_args));
3363         aa = ptlrpc_req_async_args(req);
3364         aa->aa_oi = oinfo;
3365
3366         ptlrpc_set_add_req(rqset, req);
3367         RETURN(0);
3368 }
3369
3370 static int osc_statfs(struct obd_device *obd, struct obd_statfs *osfs,
3371                       __u64 max_age, __u32 flags)
3372 {
3373         struct obd_statfs *msfs;
3374         struct ptlrpc_request *req;
3375         struct obd_import     *imp = NULL;
3376         __u32 size[2] = { sizeof(struct ptlrpc_body), sizeof(*osfs) };
3377         int rc;
3378         ENTRY;
3379
3380         /*Since the request might also come from lprocfs, so we need
3381          *sync this with client_disconnect_export Bug15684*/
3382         down_read(&obd->u.cli.cl_sem);
3383         if (obd->u.cli.cl_import)
3384                 imp = class_import_get(obd->u.cli.cl_import);
3385         up_read(&obd->u.cli.cl_sem);
3386         if (!imp)
3387                 RETURN(-ENODEV);
3388
3389         /* We could possibly pass max_age in the request (as an absolute
3390          * timestamp or a "seconds.usec ago") so the target can avoid doing
3391          * extra calls into the filesystem if that isn't necessary (e.g.
3392          * during mount that would help a bit).  Having relative timestamps
3393          * is not so great if request processing is slow, while absolute
3394          * timestamps are not ideal because they need time synchronization. */
3395         req = ptlrpc_prep_req(imp, LUSTRE_OST_VERSION,
3396                               OST_STATFS, 1, NULL, NULL);
3397
3398         class_import_put(imp);
3399         if (!req)
3400                 RETURN(-ENOMEM);
3401
3402         ptlrpc_req_set_repsize(req, 2, size);
3403         req->rq_request_portal = OST_CREATE_PORTAL;
3404         ptlrpc_at_set_req_timeout(req);
3405
3406         if (flags & OBD_STATFS_NODELAY) {
3407                 /* procfs requests not want stat in wait for avoid deadlock */
3408                 req->rq_no_resend = 1;
3409                 req->rq_no_delay = 1;
3410         }
3411
3412         rc = ptlrpc_queue_wait(req);
3413         if (rc)
3414                 GOTO(out, rc);
3415
3416         msfs = lustre_swab_repbuf(req, REPLY_REC_OFF, sizeof(*msfs),
3417                                   lustre_swab_obd_statfs);
3418         if (msfs == NULL) {
3419                 CERROR("Can't unpack obd_statfs\n");
3420                 GOTO(out, rc = -EPROTO);
3421         }
3422
3423         memcpy(osfs, msfs, sizeof(*osfs));
3424
3425         EXIT;
3426  out:
3427         ptlrpc_req_finished(req);
3428         return rc;
3429 }
3430
3431 /* Retrieve object striping information.
3432  *
3433  * @lmmu is a pointer to an in-core struct with lmm_ost_count indicating
3434  * the maximum number of OST indices which will fit in the user buffer.
3435  * lmm_magic must be LOV_MAGIC_V1 or LOV_MAGIC_V3 (we only use 1 slot here).
3436  */
3437 static int osc_getstripe(struct lov_stripe_md *lsm, struct lov_user_md *lump)
3438 {
3439         /* we use lov_user_md_v3 because it is larger than lov_user_md_v1 */
3440         struct lov_user_md_v3 lum, *lumk;
3441         int rc = 0, lum_size;
3442         struct lov_user_ost_data_v1 *lmm_objects;
3443         ENTRY;
3444
3445         if (!lsm)
3446                 RETURN(-ENODATA);
3447
3448         /* we only need the header part from user space to get lmm_magic and
3449          * lmm_stripe_count, (the header part is common to v1 and v3) */
3450         lum_size = sizeof(struct lov_user_md_v1);
3451         if (copy_from_user(&lum, lump, lum_size))
3452                 RETURN(-EFAULT);
3453
3454         if ((lum.lmm_magic != LOV_USER_MAGIC_V1) &&
3455             (lum.lmm_magic != LOV_USER_MAGIC_V3))
3456                 RETURN(-EINVAL);
3457
3458         /* lov_user_md_vX and lov_mds_md_vX must have the same size */
3459         LASSERT(sizeof(struct lov_user_md_v1) == sizeof(struct lov_mds_md_v1));
3460         LASSERT(sizeof(struct lov_user_md_v3) == sizeof(struct lov_mds_md_v3));
3461         LASSERT(sizeof(lum.lmm_objects[0]) == sizeof(lumk->lmm_objects[0]));
3462
3463         /* we can use lov_mds_md_size() to compute lum_size
3464          * because lov_user_md_vX and lov_mds_md_vX have the same size */
3465         if (lum.lmm_stripe_count > 0) {
3466                 lum_size = lov_mds_md_size(lum.lmm_stripe_count, lum.lmm_magic);
3467                 OBD_ALLOC(lumk, lum_size);
3468                 if (!lumk)
3469                         RETURN(-ENOMEM);
3470                 if (lum.lmm_magic == LOV_USER_MAGIC_V1)
3471                         lmm_objects = &(((struct lov_user_md_v1 *)lumk)->lmm_objects[0]);
3472                 else
3473                         lmm_objects = &(lumk->lmm_objects[0]);
3474                 lmm_objects->l_object_id = lsm->lsm_object_id;
3475         } else {
3476                 lum_size = lov_mds_md_size(0, lum.lmm_magic);
3477                 lumk = &lum;
3478         }
3479
3480         lumk->lmm_object_id = lsm->lsm_object_id;
3481         lumk->lmm_stripe_count = 1;
3482
3483         if (copy_to_user(lump, lumk, lum_size))
3484                 rc = -EFAULT;
3485
3486         if (lumk != &lum)
3487                 OBD_FREE(lumk, lum_size);
3488
3489         RETURN(rc);
3490 }
3491
3492
3493 static int osc_iocontrol(unsigned int cmd, struct obd_export *exp, int len,
3494                          void *karg, void *uarg)
3495 {
3496         struct obd_device *obd = exp->exp_obd;
3497         struct obd_ioctl_data *data = karg;
3498         int err = 0;
3499         ENTRY;
3500
3501         if (!try_module_get(THIS_MODULE)) {
3502                 CERROR("Can't get module. Is it alive?");
3503                 return -EINVAL;
3504         }
3505         switch (cmd) {
3506         case OBD_IOC_LOV_GET_CONFIG: {
3507                 char *buf;
3508                 struct lov_desc *desc;
3509                 struct obd_uuid uuid;
3510
3511                 buf = NULL;
3512                 len = 0;
3513                 if (obd_ioctl_getdata(&buf, &len, (void *)uarg))
3514                         GOTO(out, err = -EINVAL);
3515
3516                 data = (struct obd_ioctl_data *)buf;
3517
3518                 if (sizeof(*desc) > data->ioc_inllen1) {
3519                         obd_ioctl_freedata(buf, len);
3520                         GOTO(out, err = -EINVAL);
3521                 }
3522
3523                 if (data->ioc_inllen2 < sizeof(uuid)) {
3524                         obd_ioctl_freedata(buf, len);
3525                         GOTO(out, err = -EINVAL);
3526                 }
3527
3528                 desc = (struct lov_desc *)data->ioc_inlbuf1;
3529                 desc->ld_tgt_count = 1;
3530                 desc->ld_active_tgt_count = 1;
3531                 desc->ld_default_stripe_count = 1;
3532                 desc->ld_default_stripe_size = 0;
3533                 desc->ld_default_stripe_offset = 0;
3534                 desc->ld_pattern = 0;
3535                 memcpy(&desc->ld_uuid, &obd->obd_uuid, sizeof(uuid));
3536
3537                 memcpy(data->ioc_inlbuf2, &obd->obd_uuid, sizeof(uuid));
3538
3539                 err = copy_to_user((void *)uarg, buf, len);
3540                 if (err)
3541                         err = -EFAULT;
3542                 obd_ioctl_freedata(buf, len);
3543                 GOTO(out, err);
3544         }
3545         case LL_IOC_LOV_SETSTRIPE:
3546                 err = obd_alloc_memmd(exp, karg);
3547                 if (err > 0)
3548                         err = 0;
3549                 GOTO(out, err);
3550         case LL_IOC_LOV_GETSTRIPE:
3551                 err = osc_getstripe(karg, uarg);
3552                 GOTO(out, err);
3553         case OBD_IOC_CLIENT_RECOVER:
3554                 err = ptlrpc_recover_import(obd->u.cli.cl_import,
3555                                             data->ioc_inlbuf1);
3556                 if (err > 0)
3557                         err = 0;
3558                 GOTO(out, err);
3559         case IOC_OSC_SET_ACTIVE:
3560                 err = ptlrpc_set_import_active(obd->u.cli.cl_import,
3561                                                data->ioc_offset);
3562                 GOTO(out, err);
3563         case OBD_IOC_POLL_QUOTACHECK:
3564                 err = lquota_poll_check(quota_interface, exp,
3565                                         (struct if_quotacheck *)karg);
3566                 GOTO(out, err);
3567         case OBD_IOC_DESTROY: {
3568                 struct obdo            *oa;
3569
3570                 if (!cfs_capable(CFS_CAP_SYS_ADMIN))
3571                         GOTO (out, err = -EPERM);
3572                 oa = &data->ioc_obdo1;
3573
3574                 if (oa->o_id == 0)
3575                         GOTO(out, err = -EINVAL);
3576
3577                 oa->o_valid |= OBD_MD_FLGROUP;
3578
3579                 err = osc_destroy(exp, oa, NULL, NULL, NULL);
3580                 GOTO(out, err);
3581         }
3582         case OBD_IOC_PING_TARGET:
3583                 err = ptlrpc_obd_ping(obd);
3584                 GOTO(out, err);
3585         default:
3586                 CDEBUG(D_INODE, "unrecognised ioctl %#x by %s\n",
3587                        cmd, cfs_curproc_comm());
3588                 GOTO(out, err = -ENOTTY);
3589         }
3590 out:
3591         module_put(THIS_MODULE);
3592         return err;
3593 }
3594
3595 static int osc_get_info(struct obd_export *exp, obd_count keylen,
3596                         void *key, __u32 *vallen, void *val, struct lov_stripe_md *lsm)
3597 {
3598         ENTRY;
3599         if (!vallen || !val)
3600                 RETURN(-EFAULT);
3601
3602         if (KEY_IS(KEY_LOCK_TO_STRIPE)) {
3603                 __u32 *stripe = val;
3604                 *vallen = sizeof(*stripe);
3605                 *stripe = 0;
3606                 RETURN(0);
3607         } else if (KEY_IS(KEY_LAST_ID)) {
3608                 struct ptlrpc_request *req;
3609                 obd_id *reply;
3610                 char *bufs[2] = { NULL, key };
3611                 __u32 size[2] = { sizeof(struct ptlrpc_body), keylen };
3612                 int rc;
3613
3614                 req = ptlrpc_prep_req(class_exp2cliimp(exp), LUSTRE_OST_VERSION,
3615                                       OST_GET_INFO, 2, size, bufs);
3616                 if (req == NULL)
3617                         RETURN(-ENOMEM);
3618
3619                 size[REPLY_REC_OFF] = *vallen;
3620                 ptlrpc_req_set_repsize(req, 2, size);
3621                 rc = ptlrpc_queue_wait(req);
3622                 if (rc)
3623                         GOTO(out, rc);
3624
3625                 reply = lustre_swab_repbuf(req, REPLY_REC_OFF, sizeof(*reply),
3626                                            lustre_swab_ost_last_id);
3627                 if (reply == NULL) {
3628                         CERROR("Can't unpack OST last ID\n");
3629                         GOTO(out, rc = -EPROTO);
3630                 }
3631                 *((obd_id *)val) = *reply;
3632         out:
3633                 ptlrpc_req_finished(req);
3634                 RETURN(rc);
3635         } else if (KEY_IS(KEY_FIEMAP)) {
3636                 struct ptlrpc_request *req;
3637                 struct ll_user_fiemap *reply;
3638                 char *bufs[2] = { NULL, key };
3639                 __u32 size[2] = { sizeof(struct ptlrpc_body), keylen };
3640                 int rc;
3641
3642                 req = ptlrpc_prep_req(class_exp2cliimp(exp), LUSTRE_OST_VERSION,
3643                                       OST_GET_INFO, 2, size, bufs);
3644                 if (req == NULL)
3645                         RETURN(-ENOMEM);
3646
3647                 size[REPLY_REC_OFF] = *vallen;
3648                 ptlrpc_req_set_repsize(req, 2, size);
3649
3650                 rc = ptlrpc_queue_wait(req);
3651                 if (rc)
3652                         GOTO(out1, rc);
3653                 reply = lustre_swab_repbuf(req, REPLY_REC_OFF, *vallen,
3654                                            lustre_swab_fiemap);
3655                 if (reply == NULL) {
3656                         CERROR("Can't unpack FIEMAP reply.\n");
3657                         GOTO(out1, rc = -EPROTO);
3658                 }
3659
3660                 memcpy(val, reply, *vallen);
3661
3662         out1:
3663                 ptlrpc_req_finished(req);
3664
3665                 RETURN(rc);
3666         }
3667
3668         RETURN(-EINVAL);
3669 }
3670
3671 static int osc_setinfo_mds_conn_interpret(struct ptlrpc_request *req,
3672                                           void *aa, int rc)
3673 {
3674         struct llog_ctxt *ctxt;
3675         struct obd_import *imp = req->rq_import;
3676         ENTRY;
3677
3678         if (rc != 0)
3679                 RETURN(rc);
3680
3681         ctxt = llog_get_context(imp->imp_obd, LLOG_MDS_OST_ORIG_CTXT);
3682         if (ctxt) {
3683                 if (rc == 0)
3684                         rc = llog_initiator_connect(ctxt);
3685                 else
3686                         CERROR("cannot establish connection for "
3687                                "ctxt %p: %d\n", ctxt, rc);
3688         }
3689
3690         llog_ctxt_put(ctxt);
3691         spin_lock(&imp->imp_lock);
3692         imp->imp_server_timeout = 1;
3693         imp->imp_pingable = 1;
3694         spin_unlock(&imp->imp_lock);
3695         CDEBUG(D_RPCTRACE, "pinging OST %s\n", obd2cli_tgt(imp->imp_obd));
3696
3697         RETURN(rc);
3698 }
3699
3700 static int osc_set_info_async(struct obd_export *exp, obd_count keylen,
3701                               void *key, obd_count vallen, void *val,
3702                               struct ptlrpc_request_set *set)
3703 {
3704         struct ptlrpc_request *req;
3705         struct obd_device  *obd = exp->exp_obd;
3706         struct obd_import *imp = class_exp2cliimp(exp);
3707         __u32 size[3] = { sizeof(struct ptlrpc_body), keylen, vallen };
3708         char *bufs[3] = { NULL, key, val };
3709         ENTRY;
3710
3711         OBD_FAIL_TIMEOUT(OBD_FAIL_OSC_SHUTDOWN, 10);
3712
3713         if (KEY_IS(KEY_NEXT_ID)) {
3714                 if (vallen != sizeof(obd_id))
3715                         RETURN(-EINVAL);
3716                 obd->u.cli.cl_oscc.oscc_next_id = *((obd_id*)val) + 1;
3717                 CDEBUG(D_HA, "%s: set oscc_next_id = "LPU64"\n",
3718                        exp->exp_obd->obd_name,
3719                        obd->u.cli.cl_oscc.oscc_next_id);
3720
3721                 RETURN(0);
3722         }
3723
3724         if (KEY_IS(KEY_UNLINKED)) {
3725                 struct osc_creator *oscc = &obd->u.cli.cl_oscc;
3726                 spin_lock(&oscc->oscc_lock);
3727                 oscc->oscc_flags &= ~OSCC_FLAG_NOSPC;
3728                 spin_unlock(&oscc->oscc_lock);
3729                 RETURN(0);
3730         }
3731
3732         if (KEY_IS(KEY_INIT_RECOV)) {
3733                 if (vallen != sizeof(int))
3734                         RETURN(-EINVAL);
3735                 spin_lock(&imp->imp_lock);
3736                 imp->imp_initial_recov = *(int *)val;
3737                 spin_unlock(&imp->imp_lock);
3738                 CDEBUG(D_HA, "%s: set imp_initial_recov = %d\n",
3739                        exp->exp_obd->obd_name,
3740                        imp->imp_initial_recov);
3741                 RETURN(0);
3742         }
3743
3744         if (KEY_IS(KEY_CHECKSUM)) {
3745                 if (vallen != sizeof(int))
3746                         RETURN(-EINVAL);
3747                 exp->exp_obd->u.cli.cl_checksum = (*(int *)val) ? 1 : 0;
3748                 RETURN(0);
3749         }
3750
3751         if (!set)
3752                 RETURN(-EINVAL);
3753
3754         /* We pass all other commands directly to OST. Since nobody calls osc
3755            methods directly and everybody is supposed to go through LOV, we
3756            assume lov checked invalid values for us.
3757            The only recognised values so far are evict_by_nid and mds_conn.
3758            Even if something bad goes through, we'd get a -EINVAL from OST
3759            anyway. */
3760
3761         req = ptlrpc_prep_req(imp, LUSTRE_OST_VERSION, OST_SET_INFO, 3, size,
3762                               bufs);
3763         if (req == NULL)
3764                 RETURN(-ENOMEM);
3765
3766         if (KEY_IS(KEY_MDS_CONN))
3767                 req->rq_interpret_reply = osc_setinfo_mds_conn_interpret;
3768
3769         ptlrpc_req_set_repsize(req, 1, NULL);
3770         ptlrpc_set_add_req(set, req);
3771         ptlrpc_check_set(set);
3772
3773         RETURN(0);
3774 }
3775
3776
3777 static struct llog_operations osc_size_repl_logops = {
3778         lop_cancel: llog_obd_repl_cancel
3779 };
3780
3781 static struct llog_operations osc_mds_ost_orig_logops;
3782 static int osc_llog_init(struct obd_device *obd, struct obd_device *tgt,
3783                          int count, struct llog_catid *catid,
3784                          struct obd_uuid *uuid)
3785 {
3786         int rc;
3787         ENTRY;
3788
3789         spin_lock(&obd->obd_dev_lock);
3790         if (osc_mds_ost_orig_logops.lop_setup != llog_obd_origin_setup) {
3791                 osc_mds_ost_orig_logops = llog_lvfs_ops;
3792                 osc_mds_ost_orig_logops.lop_setup = llog_obd_origin_setup;
3793                 osc_mds_ost_orig_logops.lop_cleanup = llog_obd_origin_cleanup;
3794                 osc_mds_ost_orig_logops.lop_add = llog_obd_origin_add;
3795                 osc_mds_ost_orig_logops.lop_connect = llog_origin_connect;
3796         }
3797         spin_unlock(&obd->obd_dev_lock);
3798
3799         rc = llog_setup(obd, LLOG_MDS_OST_ORIG_CTXT, tgt, count,
3800                         &catid->lci_logid, &osc_mds_ost_orig_logops);
3801         if (rc) {
3802                 CERROR("failed LLOG_MDS_OST_ORIG_CTXT\n");
3803                 GOTO (out, rc);
3804         }
3805
3806         rc = llog_setup(obd, LLOG_SIZE_REPL_CTXT, tgt, count, NULL,
3807                         &osc_size_repl_logops);
3808         if (rc) {
3809                 struct llog_ctxt *ctxt =
3810                         llog_get_context(obd, LLOG_MDS_OST_ORIG_CTXT);
3811                 if (ctxt)
3812                         llog_cleanup(ctxt);
3813                 CERROR("failed LLOG_SIZE_REPL_CTXT\n");
3814         }
3815 out:
3816         if (rc) {
3817                 CERROR("osc '%s' tgt '%s' cnt %d catid %p rc=%d\n",
3818                        obd->obd_name, tgt->obd_name, count, catid, rc);
3819                 CERROR("logid "LPX64":0x%x\n",
3820                        catid->lci_logid.lgl_oid, catid->lci_logid.lgl_ogen);
3821         }
3822         RETURN(rc);
3823 }
3824
3825 static int osc_llog_finish(struct obd_device *obd, int count)
3826 {
3827         struct llog_ctxt *ctxt;
3828         int rc = 0, rc2 = 0;
3829         ENTRY;
3830
3831         ctxt = llog_get_context(obd, LLOG_MDS_OST_ORIG_CTXT);
3832         if (ctxt)
3833                 rc = llog_cleanup(ctxt);
3834
3835         ctxt = llog_get_context(obd, LLOG_SIZE_REPL_CTXT);
3836         if (ctxt)
3837                 rc2 = llog_cleanup(ctxt);
3838         if (!rc)
3839                 rc = rc2;
3840
3841         RETURN(rc);
3842 }
3843
3844 static int osc_reconnect(struct obd_export *exp, struct obd_device *obd,
3845                          struct obd_uuid *cluuid,
3846                          struct obd_connect_data *data,
3847                          void *localdata)
3848 {
3849         struct client_obd *cli = &obd->u.cli;
3850
3851         if (data != NULL && (data->ocd_connect_flags & OBD_CONNECT_GRANT)) {
3852                 long lost_grant;
3853
3854                 client_obd_list_lock(&cli->cl_loi_list_lock);
3855                 data->ocd_grant = cli->cl_avail_grant ?:
3856                                 2 * cli->cl_max_pages_per_rpc << CFS_PAGE_SHIFT;
3857                 lost_grant = cli->cl_lost_grant;
3858                 cli->cl_lost_grant = 0;
3859                 client_obd_list_unlock(&cli->cl_loi_list_lock);
3860
3861                 CDEBUG(D_CACHE, "request ocd_grant: %d cl_avail_grant: %ld "
3862                        "cl_lost_grant: %ld\n", data->ocd_grant,
3863                        cli->cl_avail_grant, lost_grant);
3864                 CDEBUG(D_RPCTRACE, "ocd_connect_flags: "LPX64" ocd_version: %d"
3865                        " ocd_grant: %d\n", data->ocd_connect_flags,
3866                        data->ocd_version, data->ocd_grant);
3867         }
3868
3869         RETURN(0);
3870 }
3871
3872 static int osc_disconnect(struct obd_export *exp)
3873 {
3874         struct obd_device *obd = class_exp2obd(exp);
3875         struct llog_ctxt  *ctxt;
3876         int rc;
3877
3878         ctxt = llog_get_context(obd, LLOG_SIZE_REPL_CTXT);
3879         if (ctxt) {
3880                 if (obd->u.cli.cl_conn_count == 1) {
3881                         /* Flush any remaining cancel messages out to the
3882                          * target */
3883                         llog_sync(ctxt, exp);
3884                 }
3885                 llog_ctxt_put(ctxt);
3886         } else {
3887                 CDEBUG(D_HA, "No LLOG_SIZE_REPL_CTXT found in obd %p\n",
3888                        obd);
3889         }
3890
3891         rc = client_disconnect_export(exp);
3892         return rc;
3893 }
3894
3895 static int osc_import_event(struct obd_device *obd,
3896                             struct obd_import *imp,
3897                             enum obd_import_event event)
3898 {
3899         struct client_obd *cli;
3900         int rc = 0;
3901
3902         ENTRY;
3903         LASSERT(imp->imp_obd == obd);
3904
3905         switch (event) {
3906         case IMP_EVENT_DISCON: {
3907                 /* Only do this on the MDS OSC's */
3908                 if (imp->imp_server_timeout) {
3909                         struct osc_creator *oscc = &obd->u.cli.cl_oscc;
3910
3911                         spin_lock(&oscc->oscc_lock);
3912                         oscc->oscc_flags |= OSCC_FLAG_RECOVERING;
3913                         spin_unlock(&oscc->oscc_lock);
3914                 }
3915                 cli = &obd->u.cli;
3916                 client_obd_list_lock(&cli->cl_loi_list_lock);
3917                 cli->cl_avail_grant = 0;
3918                 cli->cl_lost_grant = 0;
3919                 client_obd_list_unlock(&cli->cl_loi_list_lock);
3920                 ptlrpc_import_setasync(imp, -1);
3921
3922                 break;
3923         }
3924         case IMP_EVENT_INACTIVE: {
3925                 rc = obd_notify_observer(obd, obd, OBD_NOTIFY_INACTIVE, NULL);
3926                 break;
3927         }
3928         case IMP_EVENT_INVALIDATE: {
3929                 struct ldlm_namespace *ns = obd->obd_namespace;
3930
3931                 /* Reset grants */
3932                 cli = &obd->u.cli;
3933                 client_obd_list_lock(&cli->cl_loi_list_lock);
3934                 /* all pages go to failing rpcs due to the invalid import */
3935                 osc_check_rpcs(cli);
3936                 client_obd_list_unlock(&cli->cl_loi_list_lock);
3937
3938                 ldlm_namespace_cleanup(ns, LDLM_FL_LOCAL_ONLY);
3939
3940                 break;
3941         }
3942         case IMP_EVENT_ACTIVE: {
3943                 /* Only do this on the MDS OSC's */
3944                 if (imp->imp_server_timeout) {
3945                         struct osc_creator *oscc = &obd->u.cli.cl_oscc;
3946
3947                         spin_lock(&oscc->oscc_lock);
3948                         oscc->oscc_flags &= ~OSCC_FLAG_NOSPC;
3949                         spin_unlock(&oscc->oscc_lock);
3950                 }
3951                 CDEBUG(D_INFO, "notify server \n");
3952                 rc = obd_notify_observer(obd, obd, OBD_NOTIFY_ACTIVE, NULL);
3953                 break;
3954         }
3955         case IMP_EVENT_OCD: {
3956                 struct obd_connect_data *ocd = &imp->imp_connect_data;
3957
3958                 if (ocd->ocd_connect_flags & OBD_CONNECT_GRANT)
3959                         osc_init_grant(&obd->u.cli, ocd);
3960
3961                 /* See bug 7198 */
3962                 if (ocd->ocd_connect_flags & OBD_CONNECT_REQPORTAL)
3963                         imp->imp_client->cli_request_portal =OST_REQUEST_PORTAL;
3964
3965                 ptlrpc_import_setasync(imp, 1);
3966                 rc = obd_notify_observer(obd, obd, OBD_NOTIFY_OCD, NULL);
3967                 break;
3968         }
3969         default:
3970                 CERROR("Unknown import event %d\n", event);
3971                 LBUG();
3972         }
3973         RETURN(rc);
3974 }
3975
3976 int osc_setup(struct obd_device *obd, obd_count len, void *buf)
3977 {
3978         int rc;
3979         ENTRY;
3980
3981         ENTRY;
3982         rc = ptlrpcd_addref();
3983         if (rc)
3984                 RETURN(rc);
3985
3986         rc = client_obd_setup(obd, len, buf);
3987         if (rc) {
3988                 ptlrpcd_decref();
3989         } else {
3990                 struct lprocfs_static_vars lvars = { 0 };
3991                 struct client_obd *cli = &obd->u.cli;
3992
3993                 lprocfs_osc_init_vars(&lvars);
3994                 if (lprocfs_obd_setup(obd, lvars.obd_vars) == 0) {
3995                         lproc_osc_attach_seqstat(obd);
3996                         ptlrpc_lprocfs_register_obd(obd);
3997                 }
3998
3999                 oscc_init(obd);
4000                 /* We need to allocate a few requests more, because
4001                    brw_interpret tries to create new requests before freeing
4002                    previous ones. Ideally we want to have 2x max_rpcs_in_flight
4003                    reserved, but I afraid that might be too much wasted RAM
4004                    in fact, so 2 is just my guess and still should work. */
4005                 cli->cl_import->imp_rq_pool =
4006                         ptlrpc_init_rq_pool(cli->cl_max_rpcs_in_flight + 2,
4007                                             OST_MAXREQSIZE,
4008                                             ptlrpc_add_rqs_to_pool);
4009                 cli->cl_cache = cache_create(obd);
4010                 if (!cli->cl_cache) {
4011                         osc_cleanup(obd);
4012                         rc = -ENOMEM;
4013                 }
4014         }
4015
4016         RETURN(rc);
4017 }
4018
4019 static int osc_precleanup(struct obd_device *obd, enum obd_cleanup_stage stage)
4020 {
4021         int rc = 0;
4022         ENTRY;
4023
4024         switch (stage) {
4025         case OBD_CLEANUP_EARLY: {
4026                 struct obd_import *imp;
4027                 imp = obd->u.cli.cl_import;
4028                 CDEBUG(D_HA, "Deactivating import %s\n", obd->obd_name);
4029                 /* ptlrpc_abort_inflight to stop an mds_lov_synchronize */
4030                 ptlrpc_deactivate_import(imp);
4031                 break;
4032         }
4033         case OBD_CLEANUP_EXPORTS: {
4034                 /* If we set up but never connected, the
4035                    client import will not have been cleaned. */
4036                 if (obd->u.cli.cl_import) {
4037                         struct obd_import *imp;
4038                         down_write(&obd->u.cli.cl_sem);
4039                         imp = obd->u.cli.cl_import;
4040                         CDEBUG(D_CONFIG, "%s: client import never connected\n",
4041                                obd->obd_name);
4042                         ptlrpc_invalidate_import(imp);
4043                         if (imp->imp_rq_pool) {
4044                                 ptlrpc_free_rq_pool(imp->imp_rq_pool);
4045                                 imp->imp_rq_pool = NULL;
4046                         }
4047                         class_destroy_import(imp);
4048                         up_write(&obd->u.cli.cl_sem);
4049                         obd->u.cli.cl_import = NULL;
4050                 }
4051                 rc = obd_llog_finish(obd, 0);
4052                 if (rc != 0)
4053                         CERROR("failed to cleanup llogging subsystems\n");
4054                 break;
4055         }
4056         case OBD_CLEANUP_SELF_EXP:
4057                 break;
4058         case OBD_CLEANUP_OBD:
4059                 break;
4060         }
4061         RETURN(rc);
4062 }
4063
4064 int osc_cleanup(struct obd_device *obd)
4065 {
4066         struct osc_creator *oscc = &obd->u.cli.cl_oscc;
4067         int rc;
4068
4069         ENTRY;
4070         ptlrpc_lprocfs_unregister_obd(obd);
4071         lprocfs_obd_cleanup(obd);
4072
4073         spin_lock(&oscc->oscc_lock);
4074         oscc->oscc_flags &= ~OSCC_FLAG_RECOVERING;
4075         oscc->oscc_flags |= OSCC_FLAG_EXITING;
4076         spin_unlock(&oscc->oscc_lock);
4077
4078         /* free memory of osc quota cache */
4079         lquota_cleanup(quota_interface, obd);
4080
4081         cache_destroy(obd->u.cli.cl_cache);
4082         rc = client_obd_cleanup(obd);
4083
4084         ptlrpcd_decref();
4085         RETURN(rc);
4086 }
4087
4088 static int osc_register_page_removal_cb(struct obd_device *obd,
4089                                         obd_page_removal_cb_t func,
4090                                         obd_pin_extent_cb pin_cb)
4091 {
4092         ENTRY;
4093
4094         /* this server - not need init */
4095         if (func == NULL)
4096                 return 0;
4097
4098         return cache_add_extent_removal_cb(obd->u.cli.cl_cache, func,
4099                                            pin_cb);
4100 }
4101
4102 static int osc_unregister_page_removal_cb(struct obd_device *obd,
4103                                           obd_page_removal_cb_t func)
4104 {
4105         ENTRY;
4106         return cache_del_extent_removal_cb(obd->u.cli.cl_cache, func);
4107 }
4108
4109 static int osc_register_lock_cancel_cb(struct obd_device *obd,
4110                                        obd_lock_cancel_cb cb)
4111 {
4112         ENTRY;
4113         LASSERT(obd->u.cli.cl_ext_lock_cancel_cb == NULL);
4114
4115         /* this server - not need init */
4116         if (cb == NULL)
4117                 return 0;
4118
4119         obd->u.cli.cl_ext_lock_cancel_cb = cb;
4120         return 0;
4121 }
4122
4123 static int osc_unregister_lock_cancel_cb(struct obd_device *obd,
4124                                          obd_lock_cancel_cb cb)
4125 {
4126         ENTRY;
4127
4128         if (obd->u.cli.cl_ext_lock_cancel_cb != cb) {
4129                 CERROR("Unregistering cancel cb %p, while only %p was "
4130                        "registered\n", cb,
4131                        obd->u.cli.cl_ext_lock_cancel_cb);
4132                 RETURN(-EINVAL);
4133         }
4134
4135         obd->u.cli.cl_ext_lock_cancel_cb = NULL;
4136         return 0;
4137 }
4138
4139 static int osc_process_config(struct obd_device *obd, obd_count len, void *buf)
4140 {
4141         struct lustre_cfg *lcfg = buf;
4142         struct lprocfs_static_vars lvars = { 0 };
4143         int rc = 0;
4144
4145         lprocfs_osc_init_vars(&lvars);
4146
4147         rc = class_process_proc_param(PARAM_OSC, lvars.obd_vars, lcfg, obd);
4148         return(rc);
4149 }
4150
4151 struct obd_ops osc_obd_ops = {
4152         .o_owner                = THIS_MODULE,
4153         .o_setup                = osc_setup,
4154         .o_precleanup           = osc_precleanup,
4155         .o_cleanup              = osc_cleanup,
4156         .o_add_conn             = client_import_add_conn,
4157         .o_del_conn             = client_import_del_conn,
4158         .o_connect              = client_connect_import,
4159         .o_reconnect            = osc_reconnect,
4160         .o_disconnect           = osc_disconnect,
4161         .o_statfs               = osc_statfs,
4162         .o_statfs_async         = osc_statfs_async,
4163         .o_packmd               = osc_packmd,
4164         .o_unpackmd             = osc_unpackmd,
4165         .o_precreate            = osc_precreate,
4166         .o_create               = osc_create,
4167         .o_destroy              = osc_destroy,
4168         .o_getattr              = osc_getattr,
4169         .o_getattr_async        = osc_getattr_async,
4170         .o_setattr              = osc_setattr,
4171         .o_setattr_async        = osc_setattr_async,
4172         .o_brw                  = osc_brw,
4173         .o_brw_async            = osc_brw_async,
4174         .o_prep_async_page      = osc_prep_async_page,
4175         .o_reget_short_lock     = osc_reget_short_lock,
4176         .o_release_short_lock   = osc_release_short_lock,
4177         .o_queue_async_io       = osc_queue_async_io,
4178         .o_set_async_flags      = osc_set_async_flags,
4179         .o_queue_group_io       = osc_queue_group_io,
4180         .o_trigger_group_io     = osc_trigger_group_io,
4181         .o_teardown_async_page  = osc_teardown_async_page,
4182         .o_punch                = osc_punch,
4183         .o_sync                 = osc_sync,
4184         .o_enqueue              = osc_enqueue,
4185         .o_match                = osc_match,
4186         .o_change_cbdata        = osc_change_cbdata,
4187         .o_cancel               = osc_cancel,
4188         .o_cancel_unused        = osc_cancel_unused,
4189         .o_join_lru             = osc_join_lru,
4190         .o_iocontrol            = osc_iocontrol,
4191         .o_get_info             = osc_get_info,
4192         .o_set_info_async       = osc_set_info_async,
4193         .o_import_event         = osc_import_event,
4194         .o_llog_init            = osc_llog_init,
4195         .o_llog_finish          = osc_llog_finish,
4196         .o_process_config       = osc_process_config,
4197         .o_register_page_removal_cb = osc_register_page_removal_cb,
4198         .o_unregister_page_removal_cb = osc_unregister_page_removal_cb,
4199         .o_register_lock_cancel_cb = osc_register_lock_cancel_cb,
4200         .o_unregister_lock_cancel_cb = osc_unregister_lock_cancel_cb,
4201 };
4202 int __init osc_init(void)
4203 {
4204         struct lprocfs_static_vars lvars = { 0 };
4205         int rc;
4206         ENTRY;
4207
4208         lprocfs_osc_init_vars(&lvars);
4209
4210         request_module("lquota");
4211         quota_interface = PORTAL_SYMBOL_GET(osc_quota_interface);
4212         lquota_init(quota_interface);
4213         init_obd_quota_ops(quota_interface, &osc_obd_ops);
4214
4215         rc = class_register_type(&osc_obd_ops, lvars.module_vars,
4216                                  LUSTRE_OSC_NAME);
4217         if (rc) {
4218                 if (quota_interface)
4219                         PORTAL_SYMBOL_PUT(osc_quota_interface);
4220                 RETURN(rc);
4221         }
4222
4223         RETURN(rc);
4224 }
4225
4226 #ifdef __KERNEL__
4227 static void /*__exit*/ osc_exit(void)
4228 {
4229         lquota_exit(quota_interface);
4230         if (quota_interface)
4231                 PORTAL_SYMBOL_PUT(osc_quota_interface);
4232
4233         class_unregister_type(LUSTRE_OSC_NAME);
4234 }
4235
4236 MODULE_AUTHOR("Sun Microsystems, Inc. <http://www.lustre.org/>");
4237 MODULE_DESCRIPTION("Lustre Object Storage Client (OSC)");
4238 MODULE_LICENSE("GPL");
4239
4240 cfs_module(osc, LUSTRE_VERSION_STRING, osc_init, osc_exit);
4241 #endif