server
import asyncio
import os
# 定义 Unix 域套接字路径
SOCKET_PATH = '/tmp/unix_socket'
async def handle_client(reader, writer):
    data = await reader.read(100)
    message = data.decode()
    addr = writer.get_extra_info('peername')
    print(f"Received {message!r} from {addr!r}")
    print("Send: Hello, Client!")
    writer.write(b'Hello, Client!')
    await writer.drain()
    print("Close the connection")
    writer.close()
    await writer.wait_closed()
async def main():
    # 如果套接字文件已存在,先删除
    if os.path.exists(SOCKET_PATH):
        os.remove(SOCKET_PATH)
    server = await asyncio.start_unix_server(handle_client, SOCKET_PATH)
    async with server:
        await server.serve_forever()
if __name__ == '__main__':
    asyncio.run(main())client
import asyncio
import os
# 定义 Unix 域套接字路径
SOCKET_PATH = '/tmp/unix_socket'
async def main():
    reader, writer = await asyncio.open_unix_connection(SOCKET_PATH)
    print("Send: Hello, Server!")
    writer.write(b'Hello, Server!')
    await writer.drain()
    data = await reader.read(100)
    message = data.decode()
    print(f"Received: {message!r}")
    print("Close the connection")
    writer.close()
    await writer.wait_closed()
if __name__ == '__main__':
    asyncio.run(main())