JAVA通信编程(四)——UDP通讯

简介: 经过TCP和串口通讯编程的了解,相信大家应该掌握CommBuff的套路了,这里首先展示的是通过UDP编程的方式实现CommBuff接口,之后通过简单工厂模式的应用说明如何屏蔽底层通讯差异。 UdpImpl类如下: package com.

经过TCP和串口通讯编程的了解,相信大家应该掌握CommBuff的套路了,这里首先展示的是通过UDP编程的方式实现CommBuff接口,之后通过简单工厂模式的应用说明如何屏蔽底层通讯差异。

UdpImpl类如下:

package com.zzh.comm;

import java.io.IOException;
import java.net.DatagramPacket;
import java.net.DatagramSocket;
import java.net.InetAddress;
import java.net.SocketException;
import java.net.UnknownHostException;
import java.util.Map;

import org.apache.log4j.Logger;

public class UdpImpl implements CommBuff
{
	private Logger logger = Logger.getLogger(Object.class.getName());
	
	private int local_port;
	private int dest_port;
	private String ip;
	private int time_out;
	
	DatagramSocket client = null;
	
	private String fileName = "/udp.properties";
	public UdpImpl()
	{
		Map<String,String> map = new ReadProperties().getPropertiesMap(fileName);
		try
		{
			local_port = Integer.parseInt(map.get("udp_local_port"));
			dest_port = Integer.parseInt(map.get("udp_dest_port"));
			time_out = Integer.parseInt(map.get("udp_timeout"));
			ip = map.get("udp_dest_ip");
		}
		catch (Exception e)
		{
			logger.error(e.getMessage());
		}
	}
	
	@Override
	public byte[] readBuff()
	{
		if(client == null)
		{
			throw new RuntimeException("clinet is null!");
		}
		byte[] recvBuf = new byte[1024];
		DatagramPacket recvPacket = new DatagramPacket(recvBuf , recvBuf.length);
		try
		{
			client.receive(recvPacket);
		}
		catch (IOException e)
		{
			logger.info(e.getMessage());
			return new byte[0];
		}
		byte[] ans = new byte[recvPacket.getLength()];
		System.arraycopy(recvPacket.getData(), 0, ans, 0, recvPacket.getLength());
		logger.info("网口接收:"+CommUtil.bytesToHex(ans));
		return ans;
	}

	@Override
	public void writeBuff(byte[] message)
	{
		if(client == null)
		{
			throw new RuntimeException("clinet is null!");
		}
		
		try
		{
			InetAddress addr = InetAddress.getByName(ip);
			DatagramPacket sendPacket = new DatagramPacket(message,message.length,addr,dest_port);
			client.send(sendPacket);
			logger.info("发送成功: "+CommUtil.bytesToHex(message));
		}
		catch (UnknownHostException e)
		{
			logger.error(e.getMessage());
		}
		catch (IOException e)
		{
			logger.error(e.getMessage());
		}
		
	}

	@Override
	public void open() {
		try
		{
			client = new DatagramSocket(local_port);
			client.setSoTimeout(time_out);
			if(client != null)
			{
				logger.info("client open succeed!");
			}
		}
		catch (SocketException e)
		{
			logger.error(e.getMessage());
		}
	}

	@Override
	public void close() 
	{
		if(client != null)
		{
			client.close();
		}
	}

	@Override
	public Object getInfo()
	{
		return null;
	}

}
UdpImpl实现了CommBuff接口的各个方法。UDP Socket采用的数据包的方式进行通讯的,这个可以与TCP的方式区分开。

下面通过一个简单工厂模式,可以实现底层通讯的便利性。

package com.zzh.comm;

public class CommFactory
{
	public CommBuff getCommBuff(String properties) throws Exception
	{
		if(properties.equals("comm_serial"))
		{
			return new SerialImpl();
		}
		else if(properties.equals("comm_tcpServer"))
		{
			return new TcpServerImpl();
		}
		else if(properties.equals("comm_tcpClient"))
		{
			return new TcpClientImpl();
		}
		else if(properties.equals("comm_udp"))
		{
			return new UdpImpl();
		}
		else
		{
			throw new Exception("Communication para error: no found avaliable communication Object instance.");
		}
	}
}
上面的getCommBuff方法通过参数properties可以初始化不同的通讯接口实现类,这样上次应用只需调用Commbuff接口的方法,而无需与底层通讯的细节相融合,极大的降低了程序间的耦合性。

本篇就简单的阐述到这里。但是下面会附加一个程序,这个程序通过调用CommFactory的方法生成底层通讯的实例,程序的主要内容是电力行业的某个通讯规约(Modbus)的实现,如果非电力行业的通讯,可以不必了解程序中的细节,可以大概看一下怎么使用.

package com.zzh.protocol;

import java.util.Calendar;
import java.util.Map;
import java.util.concurrent.ConcurrentLinkedDeque;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;

import com.zzh.comm.CommBuff;
import com.zzh.comm.CommFactory;
import com.zzh.comm.CommUtil;
import com.zzh.comm.ReadProperties;
import com.zzh.dao.ModbusDao;
import com.zzh.dao.ModbusDaoImpl;
import com.zzh.dao.pojo.ModbusPojo;

public class Modbus {
	private CommBuff comm;
	private int comm_timeout;
	private byte devAddr;
	
	private static int RECV_SIZE = 35;
	private static int RECV_INNER_SIZE = 30;
	private static int MINUTE=60000;
	private volatile boolean  refreshFlag = false;
	
	private ModbusPojo modbusPojo; 
	
	private ConcurrentLinkedDeque<Byte> deque = new ConcurrentLinkedDeque<Byte>();
	private String fileName = "/modbus.properties";
	
	public Modbus()
	{
		Map<String,String> map = new ReadProperties().getPropertiesMap(fileName);
		String comm_way = map.get("modbus_comm_way");
		String comm_timeouts = map.get("comm_timeout");
		comm_timeout = Integer.parseInt(comm_timeouts);
		String devAddrs = map.get("devAddr");
		devAddr = Byte.parseByte(devAddrs);
		if(comm_way!=null)
		{
			modbusPojo = new ModbusPojo(); 
			try
			{
				comm = new CommFactory().getCommBuff(comm_way);
			}
			catch (Exception e)
			{
				e.printStackTrace();
			}
			comm.open();
			
			ExecutorService pool = Executors.newFixedThreadPool(2);
			Thread thread1 = new Thread(new readThread());
	    	thread1.setDaemon(true);
	    	Thread thread2 = new Thread(new dbThread());
	    	thread2.setDaemon(true);
	    	pool.execute(thread1);
	    	pool.execute(thread2);
		}
		else
		{
			throw new RuntimeException("没有配置好合适的串口参数");
		}
	}
	
	private class readThread implements Runnable
	{
		@Override
		public void run()
		{
			while(true)
			{
				byte[] recvBuff = comm.readBuff();
				if(recvBuff.length>0)
				{
					for(int i=0;i<recvBuff.length;i++)
					{
						deque.add(recvBuff[i]);
					}
				}
				try
				{
					TimeUnit.MILLISECONDS.sleep(1000);
				}
				catch (InterruptedException e)
				{
					e.printStackTrace();
				}
			}
		}
	}
	
	private class dbThread implements Runnable
	{
		@Override
		public void run()
		{
			while(true)
			{
				if(refreshFlag == true)
				{
					Calendar now = Calendar.getInstance();
					if(now.get(Calendar.MINUTE)%5==0)
//					if(true)
					{
						synchronized (modbusPojo)
						{
							filterModbusPojo();
							modbusPojo.setNow(TimeUtil.getDateOfMM(now));
//							modbusPojo.setNow(new java.sql.Timestamp(new Date().getTime()));
							ModbusDao md = new ModbusDaoImpl();
							md.addModbus(modbusPojo);
						}
					}
				}
				try
				{
					TimeUnit.MILLISECONDS.sleep(MINUTE);
//					TimeUnit.MILLISECONDS.sleep(1000);
				}
				catch (InterruptedException e)
				{
					e.printStackTrace();
				}
			}
		}
		
	}
	
	public void filterModbusPojo()
	{
		modbusPojo.setQua(0);
		if(modbusPojo.getEnvTemperature()>ModbusUtil.TEMPERATURE_UP)
		{
			modbusPojo.setEnvTemperature(ModbusUtil.TEMPERATURE_UP);
			System.out.println("getEnvTemperature = "+modbusPojo.getEnvTemperature());
			modbusPojo.setQua(1);
		}
		if(modbusPojo.getEnvTemperature()<ModbusUtil.TEMPERATURE_LOW)
		{
			modbusPojo.setEnvTemperature(ModbusUtil.TEMPERATURE_LOW);
			System.out.println("getEnvTemperature = "+modbusPojo.getEnvTemperature());
			modbusPojo.setQua(1);
		}
		if(modbusPojo.getTemperature()>ModbusUtil.TEMPERATURE_UP)
		{
			modbusPojo.setTemperature(ModbusUtil.TEMPERATURE_UP);
			System.out.println("getTemperature = "+modbusPojo.getTemperature());
			modbusPojo.setQua(1);
		}
		if(modbusPojo.getTemperature()<ModbusUtil.TEMPERATURE_LOW)
		{
			modbusPojo.setTemperature(ModbusUtil.TEMPERATURE_LOW);
			System.out.println("getTemperature = "+modbusPojo.getTemperature());
			modbusPojo.setQua(1);
		}
		if(modbusPojo.getHumidity()>ModbusUtil.HUMIDITY_UP)
		{
			modbusPojo.setHumidity(ModbusUtil.HUMIDITY_UP);
			System.out.println("getHumidity = "+modbusPojo.getHumidity());
			modbusPojo.setQua(1);
		}
		if(modbusPojo.getHumidity()<ModbusUtil.HUMIDITY_LOW)
		{
			modbusPojo.setHumidity(ModbusUtil.HUMIDITY_LOW);
			System.out.println("getHumidity = "+modbusPojo.getHumidity());
			modbusPojo.setQua(1);
		}
		if(modbusPojo.getPressure()>ModbusUtil.PRESSURE_UP)
		{
			modbusPojo.setPressure(ModbusUtil.PRESSURE_UP);
			System.out.println("getPressure = "+modbusPojo.getPressure());
			modbusPojo.setQua(1);
		}
		if(modbusPojo.getPressure()<ModbusUtil.PRESSURE_LOW)
		{
			modbusPojo.setPressure(ModbusUtil.PRESSURE_LOW);
			System.out.println("getPressure = "+modbusPojo.getPressure());
			modbusPojo.setQua(1);
		}
		if(modbusPojo.getIrradiance()>ModbusUtil.IRRADIANCE_UP)
		{
			modbusPojo.setIrradiance(ModbusUtil.IRRADIANCE_UP);
			System.out.println("getIrradiance = "+modbusPojo.getIrradiance());
			modbusPojo.setQua(1);
		}
		if(modbusPojo.getIrradiance()<ModbusUtil.IRRADIANCE_LOW)
		{
			modbusPojo.setIrradiance(ModbusUtil.IRRADIANCE_LOW);
			System.out.println("getIrradiance = "+modbusPojo.getIrradiance());
			modbusPojo.setQua(1);
		}
		if(modbusPojo.getScaIrradiance()>ModbusUtil.IRRADIANCE_UP)
		{
			modbusPojo.setScaIrradiance(ModbusUtil.IRRADIANCE_UP);
			System.out.println("getScaIrradiance = "+modbusPojo.getScaIrradiance());
			modbusPojo.setQua(1);
		}
		if(modbusPojo.getScaIrradiance()<ModbusUtil.IRRADIANCE_LOW)
		{
			modbusPojo.setScaIrradiance(ModbusUtil.IRRADIANCE_LOW);
			System.out.println("getScaIrradiance = "+modbusPojo.getScaIrradiance());
			modbusPojo.setQua(1);
		}
		if(modbusPojo.getDirIrradiance()>ModbusUtil.IRRADIANCE_UP)
		{
			modbusPojo.setDirIrradiance(ModbusUtil.IRRADIANCE_UP);
			System.out.println("getDirIrradiance = "+modbusPojo.getDirIrradiance());
			modbusPojo.setQua(1);
		}
		if(modbusPojo.getDirIrradiance()<ModbusUtil.IRRADIANCE_LOW)
		{
			modbusPojo.setDirIrradiance(ModbusUtil.IRRADIANCE_LOW);
			System.out.println("getDirIrradiance = "+modbusPojo.getDirIrradiance());
			modbusPojo.setQua(1);
		}
		if(modbusPojo.getWindSpeed()>ModbusUtil.UAVG_UP)
		{
			modbusPojo.setWindSpeed(ModbusUtil.UAVG_UP);
			System.out.println("getWindSpeed = "+modbusPojo.getWindSpeed());
			modbusPojo.setQua(1);
		}
		if(modbusPojo.getWindSpeed()<ModbusUtil.UAVG_LOW)
		{
			modbusPojo.setWindSpeed(ModbusUtil.UAVG_LOW);
			System.out.println("getWindSpeed = "+modbusPojo.getWindSpeed());
			modbusPojo.setQua(1);
		}
		if(modbusPojo.getWindDir()>ModbusUtil.VAVG_UP)
		{
			modbusPojo.setWindDir(ModbusUtil.VAVG_UP);
			System.out.println("getWindDir = "+modbusPojo.getWindDir());
			modbusPojo.setQua(1);
		}
		if(modbusPojo.getWindDir()<ModbusUtil.VAVG_LOW)
		{
			modbusPojo.setWindDir(ModbusUtil.VAVG_LOW);
			System.out.println("getWindDir = "+modbusPojo.getWindDir());
			modbusPojo.setQua(1);
		}
	}
	
	public void process()
	{
		try
		{
			TimeUnit.MILLISECONDS.sleep(comm_timeout);
		}
		catch (InterruptedException e)
		{
			e.printStackTrace();
		}
		recvProcess();
		sendProcess();
	}

	public void recvProcess()
	{
		refreshFlag = false;
		byte[] recvBuff = new byte[RECV_INNER_SIZE];
		while(deque.size()>=RECV_SIZE)
		{
			Byte first = deque.pollFirst();
			if(first == devAddr)
			{
				Byte second = deque.pollFirst();
				if(second == 0x03)
				{
					Byte third = deque.pollFirst();
					if(third == RECV_INNER_SIZE)
					{
						for(int i=0;i<RECV_INNER_SIZE;i++)
						{
							recvBuff[i] = deque.pollFirst();
						}
						deque.pollFirst();
						deque.pollFirst();
						dealRecvBuff(recvBuff);
					}
				}
			}
		}
	}
	
	public void dealRecvBuff(byte[] recvBuff)
	{
		System.out.println(CommUtil.bytesToHex(recvBuff));
		refreshFlag = true;
		getModbusPojo(recvBuff);
//		modbusPojo.print();
	}
	
	public void getModbusPojo(byte[] recvBuff)
	{
		int temp;
		synchronized (modbusPojo)
		{
			for(int i=0;i<recvBuff.length;)
			{
				switch(i)
				{
					case 0:
						temp = ModbusUtil.getSignedAns(recvBuff, 0, 1);
						double envTemperature = temp*0.1;
						modbusPojo.setEnvTemperature(envTemperature);
						break;
					case 2:
						temp = ModbusUtil.getSignedAns(recvBuff, 2, 3);
						double temperature = temp*0.1;
						modbusPojo.setTemperature(temperature);
						break;
					case 4:
						temp = ModbusUtil.getUnsignedAns(recvBuff, 4, 5);
						double humidity = temp*0.1;
						modbusPojo.setHumidity(humidity);
						break;
					case 6:
						temp = ModbusUtil.getUnsignedAns(recvBuff, 6, 7);
						double pressure = temp*0.1;
						modbusPojo.setPressure(pressure);
						break;
					case 8:
						temp = ModbusUtil.getUnsignedAns(recvBuff, 8, 9);
						modbusPojo.setIrradiance(temp);
						break;
					case 10:
						temp = ModbusUtil.getUnsignedAns(recvBuff, 10, 11);
						modbusPojo.setScaIrradiance(temp);
						break;
					case 12:
						temp = ModbusUtil.getUnsignedAns(recvBuff, 12, 13);
						modbusPojo.setDirIrradiance(temp);
						break;
					case 14:
						temp = ModbusUtil.getUnsignedAns(recvBuff, 14, 15);
						modbusPojo.setWindDir(temp);
						break;
					case 16:
						temp = ModbusUtil.getUnsignedAns(recvBuff, 16, 17);
						double windSpeed = temp*0.1;
						modbusPojo.setWindSpeed(windSpeed);
						break;
					case 18:
						temp = ModbusUtil.getUnsignedAns(recvBuff, 18, 19);
						double windSpeedTwo = temp*0.1;
						modbusPojo.setWindSpeedTwo(windSpeedTwo);
						break;
					case 20:
						temp = ModbusUtil.getUnsignedAns(recvBuff, 20, 21);
						double windSpeedTen = temp*0.1;
						modbusPojo.setWindSpeedTen(windSpeedTen);
						break;
					case 22:
						temp = ModbusUtil.getUnsignedAns(recvBuff, 22, 23);
						modbusPojo.setDailyExposure(temp);
						break;
					case 24:
						temp = ModbusUtil.getUnsignedAns(recvBuff, 24, 25);
						double totalExposure = temp*0.001;
						modbusPojo.setTotalExposure(totalExposure);
						break;
					case 26:
						temp = ModbusUtil.getUnsignedAns(recvBuff, 26, 27);
						double scaExposure = temp*0.001;
						modbusPojo.setScaExposure(scaExposure);
						break;
					case 28:
						temp = ModbusUtil.getUnsignedAns(recvBuff, 28, 29);
						double dirExposure = temp*0.001;
						modbusPojo.setDirExposure(dirExposure);
						break;
				}
				i=i+2;
			}
		}
	}
	
	public void sendProcess()
	{
		byte[] message = new byte[8];
		int sendLen = 0;
		message[sendLen++] = devAddr;
		message[sendLen++] = 0x03;
		message[sendLen++] = 0x00;
		message[sendLen++] = 0x00;
		message[sendLen++] = 0x00;
		message[sendLen++] = 0x0F;
		byte[] crc = CommUtil.CRC16(message,6);
		message[sendLen++] = crc[0];
		message[sendLen++] = crc[1];
		comm.writeBuff(message);
	}

}


目录
相关文章
|
5天前
|
存储 Java 数据库连接
java多线程之线程通信
java多线程之线程通信
|
6天前
|
安全 Java 开发者
深入理解Java并发编程:线程安全与性能优化
【4月更文挑战第9天】本文将深入探讨Java并发编程的核心概念,包括线程安全和性能优化。我们将详细解析Java中的同步机制,包括synchronized关键字、Lock接口以及并发集合等,并探讨它们如何影响程序的性能。此外,我们还将讨论Java内存模型,以及它如何影响并发程序的行为。最后,我们将提供一些实用的并发编程技巧和最佳实践,帮助开发者编写出既线程安全又高效的Java程序。
20 3
|
9天前
|
Java 调度
Java并发编程:深入理解线程池的原理与实践
【4月更文挑战第6天】本文将深入探讨Java并发编程中的重要概念——线程池。我们将从线程池的基本原理入手,逐步解析其工作过程,以及如何在实际开发中合理使用线程池以提高程序性能。同时,我们还将关注线程池的一些高级特性,如自定义线程工厂、拒绝策略等,以帮助读者更好地掌握线程池的使用技巧。
|
13天前
|
存储 NoSQL Java
Java数据库编程指南:实现高效数据存储与访问
【4月更文挑战第2天】Java开发者必须掌握数据库编程,尤其是JDBC,它是连接数据库的标准接口。使用Spring JDBC或JPA能简化操作。选择合适的JDBC驱动,如MySQL Connector/J,对性能至关重要。最佳实践包括事务管理、防SQL注入、优化索引和数据库设计。NoSQL数据库如MongoDB也日益重要,Java有对应的驱动支持。理解这些概念和技术是构建高效数据库应用的基础。
Java数据库编程指南:实现高效数据存储与访问
|
9天前
|
设计模式 安全 Java
Java并发编程实战:使用synchronized关键字实现线程安全
【4月更文挑战第6天】Java中的`synchronized`关键字用于处理多线程并发,确保共享资源的线程安全。它可以修饰方法或代码块,实现互斥访问。当用于方法时,锁定对象实例或类对象;用于代码块时,锁定指定对象。过度使用可能导致性能问题,应注意避免锁持有时间过长、死锁,并考虑使用`java.util.concurrent`包中的高级工具。正确理解和使用`synchronized`是编写线程安全程序的关键。
|
11天前
|
Java
深入理解Java并发编程:线程池的应用与优化
【4月更文挑战第3天】 在Java并发编程中,线程池是一种重要的资源管理工具,它能有效地控制和管理线程的数量,提高系统性能。本文将深入探讨Java线程池的工作原理、应用场景以及优化策略,帮助读者更好地理解和应用线程池。
|
7天前
|
Java
Java 并发编程:深入理解线程池
【4月更文挑战第8天】本文将深入探讨 Java 中的线程池技术,包括其工作原理、优势以及如何使用。线程池是 Java 并发编程的重要工具,它可以有效地管理和控制线程的执行,提高系统性能。通过本文的学习,读者将对线程池有更深入的理解,并能在实际开发中灵活运用。
|
3天前
|
安全 算法 Java
深入理解Java并发编程:线程安全与性能优化
【4月更文挑战第11天】 在Java中,高效的并发编程是提升应用性能和响应能力的关键。本文将探讨Java并发的核心概念,包括线程安全、锁机制、线程池以及并发集合等,同时提供实用的编程技巧和最佳实践,帮助开发者在保证线程安全的前提下,优化程序性能。我们将通过分析常见的并发问题,如竞态条件、死锁,以及如何利用现代Java并发工具来避免这些问题,从而构建更加健壮和高效的多线程应用程序。
|
7天前
|
Java
Java并发编程:深入理解线程池
【4月更文挑战第7天】在现代软件开发中,多线程编程已经成为一种不可或缺的技术。为了提高程序性能和资源利用率,Java提供了线程池这一强大工具。本文将深入探讨Java线程池的原理、使用方法以及如何根据实际需求定制线程池,帮助读者更好地理解和应用线程池技术。
13 0
|
9天前
|
缓存 安全 Java
Java并发编程进阶:深入理解Java内存模型
【4月更文挑战第6天】Java内存模型(JMM)是多线程编程的关键,定义了线程间共享变量读写的规则,确保数据一致性和可见性。主要包括原子性、可见性和有序性三大特性。Happens-Before原则规定操作顺序,内存屏障和锁则保障这些原则的实施。理解JMM和相关机制对于编写线程安全、高性能的Java并发程序至关重要。