NIO的callback调用方式

news/2024/10/3 22:16:27 标签: nio, 开发语言, java

1.消费者

java">public class CallbackClient {
    public static void main(String[] args) {
        try {
            SocketChannel socketChannel = SocketChannel.open();
            socketChannel.connect(new InetSocketAddress("127.0.0.1", 8000));

            ByteBuffer writeBuffer = ByteBuffer.allocate(32);
            ByteBuffer readBuffer = ByteBuffer.allocate(32);

            getMessage(readBuffer, socketChannel);
            sendRandomInt(writeBuffer, socketChannel, 1000);
            getMessage(readBuffer, socketChannel);

            try {
                Thread.sleep(5000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }

            sendRandomInt(writeBuffer, socketChannel, 10);
            getMessage(readBuffer, socketChannel);

            socketChannel.close();
        } catch (IOException e) {
        }
    }

    public static void sendRandomInt(ByteBuffer writeBuffer, SocketChannel socketChannel, int bound) {
        Random r = new Random();
        int d = 0;

        d = r.nextInt(bound);
        if (d == 0)
            d = 1;
        System.out.println(d);
        writeBuffer.clear();
        writeBuffer.put(String.valueOf(d).getBytes());
        writeBuffer.flip();
        try {
            socketChannel.write(writeBuffer);
        } catch (IOException e) {
            e.printStackTrace();
        }
    }

    public static void getMessage(ByteBuffer readBuffer, SocketChannel socketChannel) {
        readBuffer.clear();
        byte[] buf = new byte[16];
        try {
            socketChannel.read(readBuffer);
        } catch (IOException e) {
        }
        readBuffer.flip();
        readBuffer.get(buf, 0, readBuffer.remaining());
        System.out.println(new String(buf));
    }
}

2.服务提供者

java">package com.example.demo.callback;

import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.ServerSocketChannel;
import java.nio.channels.SocketChannel;
import java.nio.charset.StandardCharsets;
import java.util.Iterator;
import java.util.Set;
 
public class NioServer {
 
    public static void main(String[] args) throws IOException {
        // 打开服务器套接字通道
        ServerSocketChannel serverSocket = ServerSocketChannel.open();
        serverSocket.configureBlocking(false);
        serverSocket.socket().bind(new InetSocketAddress(8000));
 
        // 打开多路复用器
        Selector selector = Selector.open();
 
        // 注册服务器通道到多路复用器上,并监听接入事件
        serverSocket.register(selector, SelectionKey.OP_ACCEPT);
 
        final ByteBuffer buffer = ByteBuffer.allocate(1024);
 
        while (true) {
            // 非阻塞地等待注册的通道事件
            selector.select();
 
            // 获取发生事件的selectionKey集合
            Set<SelectionKey> selectedKeys = selector.selectedKeys();
            Iterator<SelectionKey> it = selectedKeys.iterator();
 
            // 遍历所有发生事件的selectionKey
            while (it.hasNext()) {
                SelectionKey key = it.next();
                it.remove();
 
                // 处理接入请求
                if (key.isAcceptable()) {
                	
                    ServerSocketChannel ssc = (ServerSocketChannel) key.channel();
                    SocketChannel socketChannel = ssc.accept();
                    socketChannel.configureBlocking(false);
                    SelectionKey newKey = socketChannel.register(selector, SelectionKey.OP_WRITE, ByteBuffer.allocate(1024));
                    
                    //添加后可使用处理方法2处理
					CommonClient client = new CommonClient(socketChannel, newKey);
                    newKey.attach(client);
                }
 
                // 处理读事件
                if (key.isReadable()) {
                    SocketChannel socketChannel = (SocketChannel) key.channel();
                    buffer.clear();
                    while (socketChannel.read(buffer) > 0) {
                        buffer.flip();
                        String receivedMessage = StandardCharsets.UTF_8.decode(buffer).toString();
                        handleReceivedMessage(socketChannel, receivedMessage);
                        buffer.clear();
                    }

					//处理方法2
					 CommonClient client = (CommonClient) key.attachment();
                     client.onRead();
                }
 
                // 处理写事件
                if (key.isWritable()) {
						//处理方法1可以仿照方法2的格式写
						//处理方法2
                        CommonClient client = (CommonClient) key.attachment();
                        client.onWrite();
                    }
            }
        }
    }
 
    // 回调函数,处理接收到的数据
    private static void handleReceivedMessage(SocketChannel socketChannel, String message) throws IOException {
        System.out.println("Received message: " + message);
        // 回复客户端
        socketChannel.write(ByteBuffer.wrap("Server received the message".getBytes(StandardCharsets.UTF_8)));
    }
}


java">public class CommonClient {
        private SocketChannel clientSocket;
        private ByteBuffer recvBuffer;
        private SelectionKey key;
        private Callback callback;

        private String msg;


        public CommonClient(SocketChannel clientSocket, SelectionKey key) {
            this.clientSocket = clientSocket;
            this.key = key;
            recvBuffer = ByteBuffer.allocate(8);

            try {
                this.clientSocket.configureBlocking(false);
                key.interestOps(SelectionKey.OP_WRITE);
            } catch (IOException e) {
            }
        }

        public void close() {
            try {
                clientSocket.close();
                key.cancel();
            }
            catch (IOException e){};
        }

        // an rpc to notify client to send a number
        public void sendMessage(String msg, Callback cback)  {
            this.callback = cback;

            try {
                try {
                    recvBuffer.clear();
                    recvBuffer.put(msg.getBytes());
                    recvBuffer.flip();
                    clientSocket.write(recvBuffer);

                    key.interestOps(SelectionKey.OP_READ);
                } catch (IOException e) {
                    e.printStackTrace();
                }
            }
            catch (Exception e) {
            }
        }

        // when key is writable, resume the fiber to continue
        // to write.
        public void onWrite() {
            sendMessage("divident", new Callback() {
                @Override
                public void onSucceed(int data) {
                    int a = data;
                    sendMessage("divisor", new Callback() {
                        @Override
                        public void onSucceed(int data) {
                            int b = data;

                            sendMessage(String.valueOf(a / b), null);
                        }
                    });
                }
            });
        }

        public void onRead() {
            int res = 0;
            try {
                try {
                    recvBuffer.clear();

                    // read may fail even SelectionKey is readable
                    // when read fails, the fiber should suspend, waiting for next
                    // time the key is ready.
                    int n = clientSocket.read(recvBuffer);
                    while (n == 0) {
                        n = clientSocket.read(recvBuffer);
                    }

                    if (n == -1) {
                        close();
                        return;
                    }

                    System.out.println("received " + n + " bytes from client");
                } catch (IOException e) {
                    e.printStackTrace();
                }

                recvBuffer.flip();
                res = getInt(recvBuffer);

                // when read ends, we are no longer interested in reading,
                // but in writing.
                key.interestOps(SelectionKey.OP_WRITE);
            } catch (Exception e) {
            }

            this.callback.onSucceed(res);
        }

        public int getInt(ByteBuffer buf) {
            int r = 0;
            while (buf.hasRemaining()) {
                r *= 10;
                r += buf.get() - '0';
            }
            return r;
        }
    }
java">    public interface Callback {
        public void onSucceed(int data);
    }

http://www.niftyadmin.cn/n/5689035.html

相关文章

《安富莱嵌入式周报》第343期:雷电USB4开源示波器正式发布,卓越的模拟前端低噪便携示波器,自带100W电源的便携智能烙铁,NASA航空航天锂电池设计

周报汇总地址&#xff1a;嵌入式周报 - uCOS & uCGUI & emWin & embOS & TouchGFX & ThreadX - 硬汉嵌入式论坛 - Powered by Discuz! 更新一期视频教程 【授人以渔】CMSIS-RTOS V2封装层专题视频&#xff0c;一期视频将常用配置和用法梳理清楚&#xff0…

stract和strncat

stract函数原型&#xff1a; char *stract&#xff08;char *dest,conset char *src &#xff09;; 功能&#xff1a;将src字符串连接到dest的末尾&#xff0c;‘\0’也会追加过去。&#xff08;把两个字符串拼接在一起&#xff09; 返回值&#xff1a; 成功&#xff1a;返回…

北京市大兴区启动乐享生活 寻味大兴 美食嘉年华 系列促销费活动

北京市大兴区启动乐享生活 寻味大兴 系列促销费活动 区商务局副局长 兰莉 致开幕辞 区餐饮行业协会会长 董志明 介绍活动内容 2024年9月30日&#xff0c;由大兴区商务局主办、大兴区餐饮行业协会承办&#xff0c;并得到高米店街道和大兴绿地缤纷城大力支持的“乐享生活 寻味大…

系统安全 - Linux /Docker 安全模型及实践

文章目录 导图Linux安全Linux 安全模型用户层权限管理的细节多用户环境中的权限管理文件权限与目录权限 最小权限原则的应用Linux 系统中的认证、授权和审计机制认证机制授权机制审计机制 小结 内网安全Docker安全1. Docker 服务隔离机制Namespace 机制Capabilities 机制CGroup…

通信工程学习:什么是SNMP简单网络管理协议

SNMP&#xff1a;简单网络管理协议 SNMP&#xff08;Simple Network Management Protocol&#xff0c;简单网络管理协议&#xff09;是一种用于在计算机网络中管理网络节点&#xff08;如服务器、工作站、路由器、交换机等&#xff09;的标准协议。它属于OSI模型的应用层&#…

两个向量所在平面的法线,外积,叉积,行列式

偶尔在一个数学题里面看到求两向量所在平面的法线&#xff0c;常规方法可以通过法线与两向量垂直这一特点&#xff0c;列两个方程求解&#xff1b;另外一种方法可以通过求解两个向量的叉积&#xff0c;用矩阵行列式 (determinant) 的方式&#xff0c;之前还没见过&#xff0c;在…

大厂校招:海能达嵌入式面试题及参考答案

SPI 协议的一些基础知识 SPI(Serial Peripheral Interface)即串行外设接口,是一种高速的、全双工、同步的通信总线。 SPI 主要由四根信号线组成: 时钟线(SCLK):由主设备产生,用于同步数据传输。时钟的频率决定了数据传输的速度。主设备输出 / 从设备输入线(MOSI):主…

Ubuntu2404安装

Ubuntu是一款非常优秀的发行版本&#xff0c;起初她的优势主要在于桌面版&#xff0c;但是随着Centos 从服务版的支持的退出&#xff0c;Ubuntu server也在迅猛的成长&#xff0c;并且不断收获了用户&#xff0c;拥有了一大批忠实的粉丝。好了&#xff0c;废话不多说&#xff0…