Programs
We have decided not to make the programs available as a download because this is not the point of the book - the programs are not finished production code but something you should type in and study.
The best solution is to provide the source code of the programs in a form that can be copied and pasted into a NetBeans of VS Code project.
Note: The VS Code task listings are at the very end of this page.
The only downside is that you have to create a project to paste the code into.
To do this follow the instructionin the book-- in particular make sure you have added any libraries to the link step and are running as root if required.
All of the programs below were copy and pasted from working programs in the IDE. They have been formatted using the built in formatter and hence are not identical in layout to the programs in the book. This is to make copy and pasting them easier. The programs in the book are formatted to be easy to read on the page.
If anything you consider important is missing or if you have any requests or comments contact:
This email address is being protected from spambots. You need JavaScript enabled to view it.
Page 32
import io
from time import sleep
fd = io.open("/sys/class/leds/led0/trigger", "w")
fd.write("none")
fd.close()
fd = io.open("/sys/class/leds/led0/brightness", "w")
while(True):
fd.write("0")
fd.flush()
sleep(1)
fd.write("1")
fd.flush()
sleep(1)
Page 33
import io
from time import sleep
fd = io.open("/sys/class/leds/led0/trigger", "w")
fd.write("timer")
fd.close()
fd = io.open("/sys/class/leds/led0/delay_on", "w")
fd.write("2000")
fd.close()
fd = io.open("/sys/class/leds/led0/delay_off", "w")
fd.write("3000")
fd.close()
Page 42
import gpiod
chip = gpiod.Chip("0")
print(chip.label())
print(chip.name())
print(chip.num_lines())
chip.close()
Page 45
import gpiod
chip = gpiod.Chip("0")
line = chip.get_line(4)
line.request(consumer="myprog.py",
type=gpiod.LINE_REQ_DIR_OUT, default_vals=[0])
while(True):
line.set_value(1)
line.set_value(0)
Page 46
import gpiod
chip = gpiod.Chip("0")
lines = chip.get_lines([4, 17])
lines.request(consumer="myprog.py", type=gpiod.LINE_REQ_DIR_OUT,
default_vals=[0, 1])
while(True):
lines.set_values([1, 0])
lines.set_values([0, 1])
Page 48
import gpiod
from time import sleep, time_ns
chip = gpiod.Chip("0")
line = chip.get_line(4)
line.request(consumer="resistor.py", type=gpiod.LINE_REQ_DIR_IN)
sleep(0.010)
line.release()
line.request(consumer="resistor.py", type=gpiod.LINE_REQ_DIR_OUT,
default_vals=[0])
sleep(0.010)
line.release()
line.request(consumer="resistor.py", type=gpiod.LINE_REQ_DIR_IN)
t = time_ns()
while(line.get_value() == 0):
pass
print((time_ns()-t)/1000)
Page 55
import fcntl
import struct
import io
GPIO_GET_CHIPINFO_IOCTL = 0x8044B401
f = io.open("/dev/gpiochip0", "rb", buffering=0)
gpiochip_info = struct.Struct("32s 32s L")
buffer = gpiochip_info.pack(b' ', b' ', 0)
result = fcntl.ioctl(f, GPIO_GET_CHIPINFO_IOCTL, buffer)
name, label, lines = gpiochip_info.unpack(result)
print(name.rstrip(b'\0').decode("utf-8"))
print(label.rstrip(b'\0').decode("utf-8"))
print(lines)
Page 59
import fcntl
import struct
import io
GPIO_GET_CHIPINFO_IOCTL = 0x8044B401
GPIO_GET_LINEHANDLE_IOCTL = 0xC16CB403
GPIOHANDLE_SET_LINE_VALUES_IOCTL = 0xC040B409
GPIOHANDLE_GET_LINE_VALUES_IOCTL = 0xC040B408
GPIOHANDLE_REQUEST_OUTPUT = 0x02
GPIOHANDLE_REQUEST_INPUT = 0x01
GPIOHANDLE_REQUEST_ACTIVE_LOW = 0x04
GPIOHANDLE_REQUEST_OPEN_DRAIN = 0x08
GPIOHANDLE_REQUEST_OPEN_SOURCE = 0x10
gpiochip_info = struct.Struct("32s 32s L")
gpiohandle_request = struct.Struct("64L L 64B 32s L L")
gpiohandle_data = struct.Struct("64B")
lines = [0]*64
lines[0] = 4
lines[1] = 17
values = [0]*64
buffer = gpiohandle_request.pack(*lines, GPIOHANDLE_REQUEST_OUTPUT,
*values, b"Output test", 2, 0)
f = io.open("/dev/gpiochip0", "rb", buffering=0)
result = fcntl.ioctl(f, GPIO_GET_LINEHANDLE_IOCTL, buffer)
f.close()
fL = struct.unpack_from("L", buffer, 360)[0]
values[0] = 1
values[1] = 0
state1 = gpiohandle_data.pack(*values)
values[0] = 0
values[1] = 1
state2 = gpiohandle_data.pack(*values)
while(True):
result = fcntl.ioctl(fL, GPIOHANDLE_SET_LINE_VALUES_IOCTL, state1)
result = fcntl.ioctl(fL, GPIOHANDLE_SET_LINE_VALUES_IOCTL, state2)
Page 60
import fcntl
import struct
import io
import os
GPIO_GET_CHIPINFO_IOCTL = 0x8044B401
GPIO_GET_LINEHANDLE_IOCTL = 0xC16CB403
GPIOHANDLE_SET_LINE_VALUES_IOCTL = 0xC040B409
GPIOHANDLE_GET_LINE_VALUES_IOCTL = 0xC040B408
GPIOHANDLE_REQUEST_OUTPUT = 0x02
GPIOHANDLE_REQUEST_INPUT = 0x01
GPIOHANDLE_REQUEST_ACTIVE_LOW = 0x04
GPIOHANDLE_REQUEST_OPEN_DRAIN = 0x08
GPIOHANDLE_REQUEST_OPEN_SOURCE = 0x10
gpiochip_info = struct.Struct("32s 32s L")
gpiohandle_request = struct.Struct("64L L 64B 32s L L")
gpiohandle_data = struct.Struct("64B")
lines = [0]*64
lines[0] = 4
lines[1] = 17
values = [0]*64
buffer = gpiohandle_request.pack(*lines, GPIOHANDLE_REQUEST_INPUT,
*values, b"Input test", 2, 0)
f = io.open("/dev/gpiochip0", "rb", buffering=0)
result = fcntl.ioctl(f, GPIO_GET_LINEHANDLE_IOCTL, buffer)
f.close()
fL = struct.unpack_from("L", result, 360)[0]
inputBuffer = gpiohandle_data.pack(*values)
result = fcntl.ioctl(fL, GPIOHANDLE_GET_LINE_VALUES_IOCTL,
inputBuffer)
os.close(fL)
print(result[0], result[1])
Page 61
import fcntl
import struct
import io
import os
from time import sleep, time_ns
GPIO_GET_CHIPINFO_IOCTL = 0x8044B401
GPIO_GET_LINEHANDLE_IOCTL = 0xC16CB403
GPIOHANDLE_SET_LINE_VALUES_IOCTL = 0xC040B409
GPIOHANDLE_GET_LINE_VALUES_IOCTL = 0xC040B408
GPIOHANDLE_REQUEST_OUTPUT = 0x02
GPIOHANDLE_REQUEST_INPUT = 0x01
GPIOHANDLE_REQUEST_ACTIVE_LOW = 0x04
GPIOHANDLE_REQUEST_OPEN_DRAIN = 0x08
GPIOHANDLE_REQUEST_OPEN_SOURCE = 0x10
gpiochip_info = struct.Struct("32s 32s L")
gpiohandle_request = struct.Struct("64L L 64B 32s L L")
gpiohandle_data = struct.Struct("64B")
lines = [0]*64
lines[0] = 4
values = [0]*64
buffer = gpiohandle_request.pack(*lines, GPIOHANDLE_REQUEST_INPUT,
*values, b"Resistance test", 1, 0)
f = io.open("/dev/gpiochip0", "rb", buffering=0)
result = fcntl.ioctl(f, GPIO_GET_LINEHANDLE_IOCTL, buffer)
fL = struct.unpack_from("L", result, 360)[0]
state = gpiohandle_data.pack(*values)
result = fcntl.ioctl(fL, GPIOHANDLE_GET_LINE_VALUES_IOCTL, state)
sleep(0.01)
os.close(fL)
buffer = gpiohandle_request.pack(*lines, GPIOHANDLE_REQUEST_OUTPUT,
*values, b"Resistance test", 1, 0)
result = fcntl.ioctl(f, GPIO_GET_LINEHANDLE_IOCTL, buffer)
fL = struct.unpack_from("L", result, 360)[0]
fcntl.ioctl(fL, GPIOHANDLE_SET_LINE_VALUES_IOCTL, state)
sleep(0.01)
os.close(fL)
buffer = gpiohandle_request.pack(*lines, GPIOHANDLE_REQUEST_INPUT,
*values, b"Resistance test", 1, 0)
result = fcntl.ioctl(f, GPIO_GET_LINEHANDLE_IOCTL, buffer)
fL = struct.unpack_from("L", result, 360)[0]
t = time_ns()
result = fcntl.ioctl(fL, GPIOHANDLE_GET_LINE_VALUES_IOCTL, state)
while(result[0] == 0):
result = fcntl.ioctl(fL, GPIOHANDLE_GET_LINE_VALUES_IOCTL, state)
print((time_ns()-t)/1000)
os.close(fL)
f.close()
Page 68
import gpiod
chip = gpiod.Chip("0")
line = chip.get_line(4)
line.request(consumer="myprog.py", type=gpiod.LINE_REQ_EV_BOTH_EDGES)
event = line.event_wait(sec=1)
if event:
print("Event on line 4")
else:
print("Time out")
Page 69
import gpiod
from time import sleep, time_ns
chip = gpiod.Chip("0")
line = chip.get_line(4)
line.request(consumer="resistor.py", type=gpiod.LINE_REQ_DIR_IN)
sleep(0.010)
line.release()
line.request(consumer="resistor.py",
type=gpiod.LINE_REQ_DIR_OUT, default_vals=[0])
sleep(0.010)
line.release()
line.request(consumer="resistor.py",
type=gpiod.LINE_REQ_EV_RISING_EDGE)
t = time_ns()
event = line.event_read()
print((event.sec*1000000000+event.nsec-t)/1000)
Page 70
import gpiod
chip = gpiod.Chip("0")
line = chip.get_line(4)
line.request(consumer="myprog.py", type=gpiod.LINE_REQ_EV_BOTH_EDGES)
while(True):
while True:
event1 = line.event_read()
if event1.type == event1.RISING_EDGE:
break
while True:
event2 = line.event_read()
if event2.type == gpiod.LineEvent.FALLING_EDGE:
break
print((event2.sec-event2.sec)*1000000000 +
(event2.nsec-event1.nsec))
Page 71a
import gpiod
from time import sleep
chip = gpiod.Chip("0")
line = chip.get_line(4)
line.request(consumer="myprog.py", type=gpiod.LINE_REQ_DIR_IN)
print("Press the button", flush=True)
sleep(20)
if line.get_value() == 1:
print("button pressed")
else:
print("button not pressed")
Page 71b
import gpiod
from time import sleep
chip = gpiod.Chip("0")
line = chip.get_line(4)
line.request(consumer="myprog.py", type=gpiod.LINE_REQ_EV_BOTH_EDGES)
print("Press the button", flush=True)
sleep(20)
event = line.event_wait(sec=0)
if event:
print("button pressed")
else:
print("button not pressed")
Page 72
import gpiod
def event_poll(line):
event = line.event_wait(sec=0)
if event:
event = line.event_read()
return event
chip = gpiod.Chip("0")
line = chip.get_line(4)
line.request(consumer="myprog.py", type=gpiod.LINE_REQ_EV_BOTH_EDGES)
while True:
event = event_poll(line)
if event:
print(event.sec, flush=True)
Page 75
import fcntl
import struct
import io
import os
from time import sleep, time_ns
import select
GPIO_GET_CHIPINFO_IOCTL = 0x8044B401
GPIO_GET_LINEHANDLE_IOCTL = 0xC16CB403
GPIOHANDLE_SET_LINE_VALUES_IOCTL = 0xC040B409
GPIOHANDLE_GET_LINE_VALUES_IOCTL = 0xC040B408
GPIO_GET_LINEEVENT_IOCTL = 0xC030B404
GPIOHANDLE_REQUEST_OUTPUT = 0x02
GPIOHANDLE_REQUEST_INPUT = 0x01
GPIOHANDLE_REQUEST_ACTIVE_LOW = 0x04
GPIOHANDLE_REQUEST_OPEN_DRAIN = 0x08
GPIOHANDLE_REQUEST_OPEN_SOURCE = 0x10
GPIOEVENT_REQUEST_RISING_EDGE = 0x01
GPIOEVENT_REQUEST_FALLING_EDGE = 0x02
GPIOEVENT_REQUEST_BOTH_EDGES = 0x03
gpiochip_info = struct.Struct("32s 32s L")
gpiohandle_request = struct.Struct("64L L 64B 32s L L")
gpiohandle_data = struct.Struct("64B")
gpioevent_request = struct.Struct("L L L 32s L")
gpioevent_data = struct.Struct("Q L")
buffer = gpioevent_request.pack(4, GPIOHANDLE_REQUEST_INPUT,
GPIOEVENT_REQUEST_BOTH_EDGES, b"Event test", 0)
f = io.open("/dev/gpiochip0", "rb", buffering=0)
result = fcntl.ioctl(f, GPIO_GET_LINEEVENT_IOCTL, buffer)
f.close()
fL = struct.unpack_from("L", result, 44)[0]
e = select.epoll()
e.register(fL)
while True:
events = e.poll(1)
if len(events) > 0:
buffer = os.read(fL, 100)
timestamp, id = gpioevent_data.unpack(buffer[0:12])
print(timestamp, id, flush=True)
Page 76
import gpiod
import struct
import io
import os
import select
gpioevent_data = struct.Struct("Q L")
chip = gpiod.Chip("0")
line = chip.get_line(4)
line.request(consumer="myprog.py", type=gpiod.LINE_REQ_EV_BOTH_EDGES)
fL = line.event_get_fd()
e = select.epoll()
e.register(fL)
while True:
events = e.poll(1)
if len(events) > 0:
buffer = os.read(fL, 100)
timestamp, id = gpioevent_data.unpack(buffer[0:12])
print(timestamp, id, flush=True)
Page 77
import gpiod
from time import sleep
import threading
def waitForInterrupt(line):
while(True):
event = line.event_read()
print(event.sec*1000000000+event.nsec, flush=True)
chip = gpiod.Chip("0")
line = chip.get_line(4)
line.request(consumer="myprog.py", type=gpiod.LINE_REQ_EV_BOTH_EDGES)
IntThread = threading.Thread(target=waitForInterrupt, args=(line,))
IntThread.start()
while True:
sleep(2)
print("running", flush=True)
Page 87
import io
fdr = io.open("/sys/bus/iio/devices/iio:device0/name", "rb", buffering=0)
name = fdr.readline().decode("utf-8")
print("name=", name)
fdr.close()
fdr = io.open("/sys/bus/iio/devices/iio:device0/in_temp_input",
"rb", buffering=0)
temp = fdr.readline().decode("utf-8")
print("temp=", int(temp)/1000, "C")
fdr.close()
fdr = io.open(
"/ sys/bus/iio/devices/iio: device0 /in_humidityrelative_input", "rb", buffering=0)
hum = fdr.readline().decode("utf-8")
print("Humidity=", int(hum)/1000, "%")
fdr.close()
Page 89
import subprocess
import io
import fcntl
def checkDht11():
indicator = "dht11 gpiopin=4"
command = ["sudo", "dtoverlay", "dht11", "gpiopin=4"]
temp = subprocess.Popen(["sudo", "dtparam", "-l"], stdout=subprocess.PIPE)
output = str(temp.communicate())
print(output)
if output.find(indicator) == -1:
temp = subprocess.Popen(command, stdout=subprocess.PIPE)
output = str(temp.communicate())
print(output)
return
checkDht11()
fdr = io.open("/sys/bus/iio/devices/iio:device0/name", "rb", buffering=0)
name = fdr.readline().decode("utf-8")
print("name=", name)
fdr.close()
fdr = io.open("/sys/bus/iio/devices/iio:device0/in_temp_input",
"rb", buffering=0)
temp = fdr.readline().decode("utf-8")
print("temp=", int(temp)/1000, "C")
fdr.close()
fdr = io.open(
"/sys/bus/iio/devices/iio:device0/in_humidityrelative_input", "rb", buffering=0)
hum = fdr.readline().decode("utf-8")
print("Humidity=", int(hum)/1000, "%")
fdr.close()
Page 111a
import gpiod
from time import sleep
chip = gpiod.Chip("0")
line = chip.get_line(4)
line.request(consumer="myprog.py",
type=gpiod.LINE_REQ_DIR_OUT, default_vals=[0])
period = 20
duty = 25
ontime = period/1000 * duty / 100
offtime = period/1000 - ontime
while(True):
line.set_value(1)
sleep(ontime)
line.set_value(0)
sleep(offtime)
Page 111b
import gpiod
from time import sleep
import threading
def softPWM(line, period, duty):
ontime = period/1000 * duty / 100
offtime = period/1000 - ontime
while(True):
line.set_value(1)
sleep(ontime)
line.set_value(0)
sleep(offtime)
chip = gpiod.Chip("0")
line = chip.get_line(4)
line.request(consumer="myprog.py",
type=gpiod.LINE_REQ_DIR_OUT, default_vals=[0])
period = 20
duty = 75
IntThread = threading.Thread(target=softPWM, args=(line, period, duty))
IntThread.start()
while(True):
print("working", flush=True)
sleep(2)
Page 115
import subprocess
import io
def checkPWM():
indicator = "pwm-2chan"
command = ["sudo", "dtoverlay", "pwm-2chan"]
temp = subprocess.Popen(["sudo", "dtparam", "-l"], stdout=subprocess.PIPE)
output = str(temp.communicate())
print(output)
if output.find(indicator) == -1:
temp = subprocess.Popen(command, stdout=subprocess.PIPE)
output = str(temp.communicate())
print(output)
return
checkPWM()
fdw = io.open("/sys/class/pwm/pwmchip0/export", "wb", buffering=0)
fdw.write(b"0")
fdw.close()
fdw = io.open("/sys/class/pwm/pwmchip0/pwm0/period", "wb", buffering=0)
fdw.write(b"10000000")
fdw.close()
fdw = io.open("/sys/class/pwm/pwmchip0/pwm0/duty_cycle", "wb", buffering=0)
fdw.write(b"8000000")
fdw.close()
fdw = io.open("/sys/class/pwm/pwmchip0/pwm0/enable", "wb", buffering=0)
fdw.write(b"1")
fdw.close()
Page 116
import subprocess
import io
from time import sleep
import os
class Pwm:
def __init__(self, channel, f, duty):
if not(channel == 0 or channel == 1):
return
self.chan = str(channel)
indicator = "pwm-2chan"
command = ["sudo", "dtoverlay", "pwm-2chan"]
temp = subprocess.Popen(
["sudo", "dtparam", "-l"], stdout=subprocess.PIPE)
output = str(temp.communicate())
print(output, flush=True)
if output.find(indicator) == -1:
temp = subprocess.Popen(command, stdout=subprocess.PIPE)
output = str(temp.communicate())
print(output, flush=True)
if not(os.path.exists("/sys/class/pwm/pwmchip0/pwm"+self.chan)):
fdw = io.open("/sys/class/pwm/pwmchip0/export", "w")
fdw.write(self.chan)
fdw.close()
while not(os.path.exists("/sys/class/pwm/pwmchip0/pwm"+self.chan+"/enable")):
pass
self.fdwp = io.open("/sys/class/pwm/pwmchip0/pwm" +
self.chan+"/period", "w")
self.setFreq(f)
self.fdwd = io.open("/sys/class/pwm/pwmchip0/pwm" +
self.chan+"/duty_cycle", "w")
self.setDuty(duty)
def setFreq(self, f):
self.f = int(1000000000/f)
self.fdwp.write(str(self.f))
self.fdwp.flush()
def setDuty(self, duty):
self.duty = int(self.f*duty/100)
self.fdwd.write(str(self.duty))
self.fdwd.flush()
def enableChan(self):
fdw = io.open("/sys/class/pwm/pwmchip0/pwm"+self.chan+"/enable", "w")
fdw.write("1")
fdw.close()
def disableChan(self):
fdw = io.open("/sys/class/pwm/pwmchip0/pwm"+self.chan+"/enable", "w")
fdw.write("0")
fdw.close()
def closeChan(self):
self.fdwd.close()
self.fdwp.close()
fdw = io.open("/sys/class/pwm/pwmchip0/unexport", "w")
fdw.write(self.chan)
fdw.close()
Page 127
import subprocess
import io
from time import sleep
import os
class Pwm:
def __init__(self, channel, f, duty):
if not(channel == 0 or channel == 1):
return
self.chan = str(channel)
indicator = "pwm-2chan"
command = ["sudo", "dtoverlay", "pwm-2chan"]
temp = subprocess.Popen(
["sudo", "dtparam", "-l"], stdout=subprocess.PIPE)
output = str(temp.communicate())
print(output, flush=True)
if output.find(indicator) != -1:
temp = subprocess.Popen(
["sudo", "dtoverlay", "-r", "0"], stdout=subprocess.PIPE)
output = str(temp.communicate())
print(output, flush=True)
temp = subprocess.Popen(command, stdout=subprocess.PIPE)
output = str(temp.communicate())
print(output, flush=True)
if os.path.exists("/sys/class/pwm/pwmchip0/pwm"+self.chan):
fdw = io.open("/sys/class/pwm/pwmchip0/unexport", "w")
fdw.write(self.chan)
fdw.close()
while os.path.exists("/sys/class/pwm/pwmchip0/pwm"+self.chan+"/enable"):
pass
fdw = io.open("/sys/class/pwm/pwmchip0/export", "w")
fdw.write(self.chan)
fdw.close()
while not(os.path.exists("/sys/class/pwm/pwmchip0/pwm"+self.chan+"/period")):
pass
self.fdwp = io.open("/sys/class/pwm/pwmchip0/pwm" +
self.chan+"/period", "w")
self.setFreq(f)
self.fdwd = io.open("/sys/class/pwm/pwmchip0/pwm" +
self.chan+"/duty_cycle", "w")
self.setDuty(duty)
def setFreq(self, f):
self.fdwp.write("200")
self.fdwp.flush()
self.f = int(1000000000/f)
self.fdwp.write(str(self.f))
self.fdwp.flush()
def setDuty(self, duty):
self.duty = int(self.f*duty/100)
self.fdwd.write(str(self.duty))
self.fdwd.flush()
def enableChan(self):
fdw = io.open("/sys/class/pwm/pwmchip0/pwm"+self.chan+"/enable", "w")
fdw.write("1")
fdw.close()
def disableChan(self):
fdw = io.open("/sys/class/pwm/pwmchip0/pwm"+self.chan+"/enable", "w")
fdw.write("0")
fdw.close()
def closeChan(self):
self.fdwd.close()
self.fdwp.close()
fdw = io.open("/sys/class/pwm/pwmchip0/unexport", "w")
fdw.write(self.chan)
fdw.close()
def inverChan(self):
fdw = io.open("/sys/class/pwm/pwmchip0/pwm"+self.chan+"/polarity", "w")
fdw.write("inversed")
fdw.close()
def normalChan(self):
fdw = io.open("/sys/class/pwm/pwmchip0/pwm"+self.chan+"/polarity", "w")
fdw.write("normal")
fdw.close()
Page 137
import subprocess
import spidev
from time import sleep
def checkSPI():
temp = subprocess.Popen(["sudo", "dtparam", "-l"], stdout=subprocess.PIPE)
output = str(temp.communicate())
lastSPI = output.rfind("spi")
if lastSPI != -1:
lastSPI = output.find("spi=on", lastSPI)
if lastSPI == -1:
temp = subprocess.Popen(
["sudo", "dtparam", "spi=on"], stdout=subprocess.PIPE)
output = str(temp.communicate())
checkSPI()
spi = spidev.SpiDev()
spi.open(0, 0)
spi.max_speed_hz = 5000
Data = [0xAA]
print(hex(Data[0]))
data = spi.xfer(Data)
print(hex(Data[0]))
if Data == 0xAA:
print("match")
spi.close()
Page 146
import subprocess
import spidev
def checkSPI():
temp = subprocess.Popen(["sudo", "dtparam", "-l"], stdout=subprocess.PIPE)
output = str(temp.communicate())
lastSPI = output.rfind("spi")
if lastSPI != -1:
lastSPI = output.find("spi=on", lastSPI)
if lastSPI == -1:
temp = subprocess.Popen(
["sudo", "dtparam", "spi=on"], stdout=subprocess.PIPE)
output = str(temp.communicate())
checkSPI()
spi = spidev.SpiDev()
spi.open(0, 0)
spi.max_speed_hz = 100000
spi.mode = 0
spi.bits_per_word = 8
Data = [0x01, 0x80, 0x00]
spi.xfer(Data)
raw = (Data[1] & 0x03) << 8 | Data[2]
print(raw)
volts = raw * 3.3 / 1023.0
print(volts, "V")
spi.close()
Page 165
import subprocess
import io
import fcntl
def checkI2CBus():
temp = subprocess.Popen(["sudo", "dtparam", "-l"],
stdout=subprocess.PIPE)
output = str(temp.communicate())
lasti2c = output.rfind("i2c_arm")
if lasti2c != -1:
lasti2c = output.find("i2c_arm=on", lasti2c)
if lasti2c == -1:
temp = subprocess.Popen(["sudo", "dtparam", "i2c_arm=on"],
stdout=subprocess.PIPE)
output = str(temp.communicate())
return
checkI2CBus()
I2C_SLAVE = 0x0703
fdr = io.open("/dev/i2c-1", "rb", buffering=0)
fdw = io.open("/dev/i2c-1", "wb", buffering=0)
fcntl.ioctl(fdr, I2C_SLAVE, 0x40)
fcntl.ioctl(fdw, I2C_SLAVE, 0x40)
fdw.write(bytearray([0xE7]))
data = fdr.read(1)
print(data)
fdr.close()
fdw.close()
Page 167
import subprocess
import io
import fcntl
from time import sleep
checkI2CBus()
I2C_SLAVE = 0x0703
fdr = io.open("/dev/i2c-1", "rb", buffering=0)
fdw = io.open("/dev/i2c-1", "wb", buffering=0)
fcntl.ioctl(fdr, I2C_SLAVE, 0x40)
fcntl.ioctl(fdw, I2C_SLAVE, 0x40)
fdw.write(bytearray([0xF3]))
while(True):
try:
data = fdr.read(3)
break
except:
sleep(0.01)
msb = data[0]
lsb = data[1]
crc = data[2]
print("msb=", msb, " lsb=", lsb, " crc=", crc)
fdw.close()
fdr.close()
Page 171
import subprocess
import io
import fcntl
from time import sleep
def crcCheck(msb, lsb, check):
data32 = (msb << 16) | (lsb << 8) | check
divisor = 0x988000
for i in range(16):
if(data32 & 1 << (23 - i)):
data32 ^= divisor
divisor >>= 1
return data32
def checkI2CBus():
temp = subprocess.Popen(["sudo", "dtparam", "-l"],
stdout=subprocess.PIPE)
output = str(temp.communicate())
print(output)
lasti2c = output.rfind("i2c_arm")
if lasti2c != -1:
lasti2c = output.find("i2c_arm=on", lasti2c)
if lasti2c == -1:
temp = subprocess.Popen(["sudo", "dtparam", "i2c_arm=on"],
stdout=subprocess.PIPE)
output = str(temp.communicate())
return
checkI2CBus()
I2C_SLAVE = 0x0703
fdr = io.open("/dev/i2c-1", "rb", buffering=0)
fdw = io.open("/dev/i2c-1", "wb", buffering=0)
fcntl.ioctl(fdr, I2C_SLAVE, 0x40)
fcntl.ioctl(fdw, I2C_SLAVE, 0x40)
fdw.write(bytearray([0xF3]))
while(True):
try:
data = fdr.read(3)
break
except:
sleep(0.01)
msb = data[0]
lsb = data[1]
crc = data[2]
data16 = (msb << 8) | (lsb & 0xFC)
temp = -46.85 + (175.72 * data16 / 65536)
print("Temperature=", temp, "C")
fdw.write(bytearray([0xF5]))
while(True):
try:
data = fdr.read(3)
break
except:
sleep(0.01)
msb = data[0]
lsb = data[1]
crc = data[2]
data16 = (msb << 8) | (lsb & 0xFC)
hum = -6 + 125.0 * data16 / 65536
print("humidity=", hum, "%")
print(crcCheck(msb, lsb, crc))
fdw.close()
fdr.close()
Page 183
import subprocess
import io
import struct
import fcntl
from time import sleep
import mmap
import os
def setTimeout(timeout):
# find base address
peripherals_base = 0x20000000
try:
with io.open('/proc/device-tree/soc/ranges', 'rb') as f:
buf = f.read(16)
peripherals_base = buf[4] << 24 | buf[5] << 16 | buf[6] << 8 | buf[7] << 0
if peripherals_base == 0:
peripherals_base = buf[8] << 24 | buf[9] << 16 | buf[10] << 8 | buf[11] << 0
except:
pass
#open /dev/mem
try:
memfd = os.open('/dev/mem', os.O_RDWR | os.O_SYNC)
except OSError:
print('unable to open /dev/mem upgrade your kernel or run as root')
# map file from start of BCS1 into memory
mem = mmap.mmap(memfd, 4096, offset=peripherals_base + 0x804000)
os.close(memfd)
# Write a value to the clock stretch timeout register
struct.pack_into('<L', mem, 0x1C, timeout)
mem.flush
mem.close()
def crcCheck(msb, lsb, check):
data32 = (msb << 16) | (lsb << 8) | check
divisor = 0x988000
for i in range(16):
if(data32 & 1 << (23 - i)):
data32 ^= divisor
divisor >>= 1
return data32
def checkI2CBus():
temp = subprocess.Popen(["sudo", "dtparam", "-l"],
stdout=subprocess.PIPE)
output = str(temp.communicate())
lasti2c = output.rfind("i2c_arm")
if lasti2c != -1:
lasti2c = output.find("i2c_arm=on", lasti2c)
if lasti2c == -1:
temp = subprocess.Popen(["sudo", "dtparam", "i2c_arm=on"],
stdout=subprocess.PIPE)
output = str(temp.communicate())
return
checkI2CBus()
I2C_SLAVE = 0x0703
fdr = io.open("/dev/i2c-1", "rb", buffering=0)
fdw = io.open("/dev/i2c-1", "wb", buffering=0)
setTimeout(10000)
fcntl.ioctl(fdr, I2C_SLAVE, 0x40)
fcntl.ioctl(fdw, I2C_SLAVE, 0x40)
fdw.write(bytearray([0xE3]))
data = fdr.read(3)
msb = data[0]
lsb = data[1]
crc = data[2]
data16 = (msb << 8) | (lsb & 0xFC)
temp = -46.85 + (175.72 * data16 / 65536)
print("Temperature=", temp, "C")
print(crcCheck(msb, lsb, crc))
fdw.write(bytearray([0xE5]))
data = fdr.read(3)
msb = data[0]
lsb = data[1]
crc = data[2]
data16 = (msb << 8) | (lsb & 0xFC)
hum = -6 + 125.0 * data16 / 65536
print("humidity=", hum, "%")
print(crcCheck(msb, lsb, crc))
fdw.close()
fdr.close()
Page 186
import subprocess
from ctypes import *
import os
import fcntl
import io
def checkI2CBus():
temp = subprocess.Popen(["sudo", "dtparam", "-l"],
stdout=subprocess.PIPE)
output = str(temp.communicate())
lasti2c = output.rfind("i2c_arm")
if lasti2c != -1:
lasti2c = output.find("i2c_arm=on", lasti2c)
if lasti2c == -1:
temp = subprocess.Popen(["sudo", "dtparam", "i2c_arm=on"],
stdout=subprocess.PIPE)
output = str(temp.communicate())
return
checkI2CBus()
I2C_SLAVE = 0x0703
I2C_FUNCS = 0x0705
I2C_FUNC_I2C = 0x00000001
I2C_FUNC_10BIT_ADDR = 0x00000002
I2C_FUNC_PROTOCOL_MANGLING = 0x00000004
I2C_FUNC_NOSTART = 0x00000010
I2C_FUNC_SLAVE = 0x00000020
i2cfd = os.open("/dev/i2c-1", os.O_RDWR | os.O_SYNC)
support = c_uint32(0)
fcntl.ioctl(i2cfd, I2C_FUNCS, support)
support = support.value
if support & I2C_FUNC_I2C:
print("I2C Support")
if support & I2C_FUNC_10BIT_ADDR:
print("10 bit address Support")
if support & I2C_FUNC_PROTOCOL_MANGLING:
print("I2C Mangling Support")
if support & I2C_FUNC_NOSTART:
print("I2C Nostart Support")
if support & I2C_FUNC_SLAVE:
print("I2C Slave Support")
Page 191
import fcntl
import os
from ctypes import *
class Msgs(Structure):
_fields_ = [("addr", c_uint16),
("flags", c_uint16),
("len", c_uint16),
("buf", POINTER(c_uint8))
]
class MsgSet(Structure):
_fields_ = [("msgs", POINTER(Msgs)),
("nmsgs", c_uint32)
]
def i2cReadRegister(i2cfd, slaveaddr, reg, n):
I2C_RDWR = 0x0707
I2C_M_RD = 0x0001
msgs = (Msgs*2)()
msgs[0].addr = c_uint16(slaveaddr)
msgs[0].flags = c_uint16(0)
msgs[0].len = c_uint16(1)
msgs[0].buf = POINTER(c_uint8)(c_uint8(reg))
msgs[1].addr = c_uint16(slaveaddr)
msgs[1].flags = c_uint16(I2C_M_RD)
msgs[1].len = c_uint16(n)
buf = (c_uint8*n)(0)
msgs[1].buf = POINTER(c_uint8)(buf)
msgset = MsgSet()
msgset.msgs = POINTER(Msgs)(msgs)
msgset.nmsgs = c_uint32(2)
libc = CDLL("libc.so.6")
libc.ioctl(i2cfd, I2C_RDWR, byref(msgset))
return msgs[1].buf
def crcCheck(msb, lsb, check):
data32 = (msb << 16) | (lsb << 8) | check
divisor = 0x988000
for i in range(16):
if(data32 & 1 << (23 - i)):
data32 ^= divisor
divisor >>= 1
return data32
i2cfd = os.open("/dev/i2c-1", os.O_RDWR | os.O_SYNC)
data = i2cReadRegister(i2cfd, 0x40, 0xE3, 3)
msb = data[0]
lsb = data[1]
crc = data[2]
data16 = (msb << 8) | (lsb & 0xFC)
temp = -46.85 + (175.72 * data16 / 65536)
print("Temperature=", temp, "C")
print(hex(msb), hex(lsb), hex(crc))
print(crcCheck(msb, lsb, crc))
Page 200
import subprocess
import io
def checkLM75():
indicator1 = "i2c_arm=on"
command1 = ["sudo", "dtparam", "i2c_arm=on"]
indicator2 = "lm75"
command2 = ["sudo", "dtoverlay", "i2c-sensor", "lm75", "addr=0x48"]
temp = subprocess.Popen(["sudo", "dtparam", "-l"], stdout=subprocess.PIPE)
output = str(temp.communicate())
print(output, flush=True)
if output.find(indicator1) == -1:
temp = subprocess.Popen(command1, stdout=subprocess.PIPE)
output = str(temp.communicate())
print(output, flush=True)
if output.find(indicator2) == -1:
temp = subprocess.Popen(command2, stdout=subprocess.PIPE)
output = str(temp.communicate())
print(output, flush=True)
checkLM75()
fdr = io.open("/sys/class/hwmon/hwmon2/temp1_input", "r")
buf = fdr.read()
print(buf)
temp = int(buf) / 1000
print(temp)
fdr.close()
fdw = io.open("/sys/class/hwmon/hwmon2/temp1_max", "w")
fdw.write("19000")
fdw.close()
fdw = io.open("/sys/class/hwmon/hwmon2/temp1_max_hyst", "w")
fdw.write("18000")
fdw.close()
Page 204
import subprocess
import io
def loadHtu21():
temp = subprocess.Popen(["sudo", "dtparam", "-l"], stdout=subprocess.PIPE)
output = str(temp.communicate())
print(output)
if output.find("htu21") == -1:
temp = subprocess.Popen(
["sudo", "dtoverlay", "i2c-sensor", "htu21"], stdout=subprocess.PIPE)
output = str(temp.communicate())
print(output)
return
loadHtu21()
fdrt = io.open("/sys/bus/iio/devices/iio:device0/in_temp_input", "r")
print(int(fdrt.read())/1000)
fdrh = io.open(
"/sys/bus/iio/devices/iio:device0/in_humidityrelative_input", "r")
print(int(fdrh.read())/1000)
Page 217
import subprocess
import io
def load1w(pin):
indicator = "w1-gpio"
command = ["sudo", "dtoverlay", "w1-gpio", "gpiopin="+str(pin)]
temp = subprocess.Popen(["sudo", "dtparam", "-l"], stdout=subprocess.PIPE)
output = str(temp.communicate())
print(output, flush=True)
if output.find(indicator) == -1:
temp = subprocess.Popen(command, stdout=subprocess.PIPE)
output = str(temp.communicate())
print(output, flush=True)
def getDevices():
fdr = io.open(
"/sys/bus/w1/drivers/w1_master_driver/w1_bus_master1/w1_master_slaves", "r")
buffer = fdr.read()
fdr.close()
return buffer.split()
def getData(dev, name):
fdr = io.open("/sys/bus/w1/devices/"+dev+"/"+name, "r")
data = fdr.read()
fdr.close()
return data
load1w(4)
devs = getDevices()
name = getData(devs[0], "name")
print("Name ", name)
resolution = getData(devs[0], "resolution")
print("Resolution ", resolution)
w1_slave = getData(devs[0], "w1_slave")
print("w1_slave ", w1_slave)
temperature = getData(devs[0], "temperature")
print("temperature ", temperature)
temperature = int(temperature) / 1000
print("temperature ", temperature, "C")
alarms = getData(devs[0], "alarms")
print("Alarms ", alarms)
Page 228
from ctypes import *
from socket import *
from struct import *
import os
import subprocess
import io
from time import sleep
NLMSG_DONE = 0x3
CN_W1_IDX = 0x3
CN_W1_VAL = 0x1
W1_SLAVE_ADD = 0
W1_SLAVE_REMOVE = 1
W1_MASTER_ADD = 2
W1_MASTER_REMOVE = 3
W1_MASTER_CMD = 4
W1_SLAVE_CMD = 5
W1_LIST_MASTERS = 6
W1_CMD_READ = 0
W1_CMD_WRITE = 1
W1_CMD_SEARCH = 2
W1_CMD_ALARM_SEARCH = 3
W1_CMD_TOUCH = 4
W1_CMD_RESET = 5
W1_CMD_SLAVE_ADD = 6
W1_CMD_SLAVE_REMOVE = 7
W1_CMD_LIST_SLAVES = 8
W1_CMD_MAX = 9
NETLINK_CONNECTOR = 11
nl_seq = 0
class nlmsghdr(Structure):
_fields_ = [("nlmsg_len", c_uint32),
("nlmsg_type", c_uint16),
("nlmsg_flags", c_uint16),
("nlmsg_seq", c_uint32),
("nlmsg_pid", c_uint32)
]
cnh = nlmsghdr()
cnh.nlmsg_seq = c_uint32(nl_seq)
nl_seq += 1
cnh.nlmsg_pid = c_uint32(os.getpid())
cnh.nlmsg_type = c_uint16(NLMSG_DONE)
cnh.nlmsg_flags = c_uint16(0)
class cn_msg(Structure):
_fields_ = [("idx", c_uint32),
("val", c_uint32),
("seq", c_uint32),
("ack", c_uint32),
("len", c_uint16),
("flags", c_uint16),
]
cmsg = cn_msg()
cmsg.idx = c_uint32(CN_W1_IDX)
cmsg.val = c_uint32(CN_W1_VAL)
cmsg.seq = c_uint32(cnh.nlmsg_seq)
cmsg.ack = c_uint32(0)
class w1_netlink_msg(Structure):
_fields_ = [("type", c_uint8),
("status", c_uint8),
("len", c_uint16),
("id", c_uint8*8),
]
msg = w1_netlink_msg()
msg.type = c_uint8(W1_MASTER_CMD)
msg.id = (c_uint8*8).from_buffer_copy(c_uint64(1))
class w1_netlink_cmd(Structure):
_fields_ = [("cmd", c_uint8),
("res", c_uint8),
("len", c_uint16),
]
w1_cmd = w1_netlink_cmd()
w1_cmd.cmd = c_uint8(W1_CMD_SEARCH)
w1_cmd.len = c_uint16(0)
msg.len = c_uint16(len(bytearray(w1_cmd)) + w1_cmd.len)
cmsg.len = c_uint16(len(bytearray(msg)) + msg.len)
cnh.nlmsg_len = c_uint32(len(bytearray(cnh))+len(bytearray(cmsg))+cmsg.len)
s = socket(AF_NETLINK, SOCK_DGRAM, NETLINK_CONNECTOR)
s.connect((0, AF_NETLINK))
buffer = bytearray(cnh)+bytearray(cmsg)+bytearray(msg)+bytearray(w1_cmd)
n = s.send(buffer)
print("Bytes sent", n, flush=True)
buffer2 = s.recv(65535)
print("Bytes received", len(buffer2), flush=True)
p = 0
cn_cnhr = nlmsghdr.from_buffer_copy(buffer2, p)
p = p+len(bytes(cn_cnhr))
cmsgr = cn_msg.from_buffer_copy(buffer2, p)
p = p+len(bytes(cmsgr))
msgr = w1_netlink_msg.from_buffer_copy(buffer2, p)
p = p+len(bytes(msgr))
w1_cmdr = w1_netlink_cmd.from_buffer_copy(buffer2, p)
num = int(w1_cmdr.len/8)
print("number of slaves", num)
p = p+len(bytes(w1_cmdr))
slaves = []
for i in range(num):
slaves.append(c_uint64.from_buffer_copy(buffer2, p+i*8).value)
print([hex(slave) for slave in slaves])
# GET ACK
buffer2 = s.recv(65535)
print("length of ack received ", len(buffer2), flush=True)
# READ TEMPERATURE
# SET UP SLAVE COMMAND
print()
print("Read Temperature")
msg.type = W1_SLAVE_CMD
id = slaves[0].to_bytes(8, "little")
msg.id = (c_uint8*8).from_buffer_copy(id)
# SETUP AND SEND WRITE 0x44 = CONVERT
w1_cmd.cmd = c_uint8(W1_CMD_WRITE)
w1_cmd_data = c_uint8(0x44)
cnh.nlmsg_seq = c_uint32(nl_seq)
nl_seq += 1
cmsg.seq = c_uint32(cnh.nlmsg_seq)
w1_cmd.len = c_uint16(1)
msg.len = c_uint16(len(bytearray(w1_cmd)) + w1_cmd.len)
cmsg.len = c_uint16(len(bytearray(msg)) + msg.len)
cnh.nlmsg_len = c_uint32(len(bytearray(cnh))+len(bytearray(cmsg))+cmsg.len)
buffer = bytearray(cnh)+bytearray(cmsg)+bytearray(msg) + \
bytearray(w1_cmd)+bytearray(w1_cmd_data)
n = s.send(buffer)
print("Bytes sent", n, flush=True)
buffer2 = s.recv(65535)
print("length of ack received ", len(buffer2), flush=True)
sleep(1)
# READ SCRATCH PAD
cnh.nlmsg_seq = nl_seq
nl_seq += 1
cmsg.seq = cnh.nlmsg_seq
w1_cmd.cmd = W1_CMD_WRITE
w1_cmd_data = c_uint8(0xBE)
w1_cmd2 = w1_netlink_cmd()
w1_cmd2.cmd = c_uint8(W1_CMD_READ)
w1_cmd.len = c_uint16(1)
w1_cmd2.len = c_uint16(9)
msg.len = c_uint16(len(bytearray(w1_cmd)) +
len(bytearray(w1_cmd2)) + w1_cmd.len+w1_cmd2.len)
cmsg.len = c_uint16(len(bytearray(msg)) + msg.len)
cnh.nlmsg_len = c_uint32(len(bytearray(cnh))+len(bytearray(cmsg))+cmsg.len)
w1_cmd2_data = (c_uint8*9)(0)
buffer = bytearray(cnh)+bytearray(cmsg)+bytearray(msg)+bytearray(w1_cmd) + \
bytearray(w1_cmd_data)+bytearray(w1_cmd2)+bytearray(w1_cmd2_data)
n = s.send(buffer)
print("Bytes sent", n, flush=True)
buffer2 = s.recv(65535)
print("length of ack received ", len(buffer2), flush=True)
buffer2 = s.recv(65535)
print("length of data received ", len(buffer2), flush=True)
p = 0
cn_cnhr = nlmsghdr.from_buffer_copy(buffer2, p)
# print(cn_cnhr.nlmsg_len)
p = p+len(bytes(cn_cnhr))
cmsgr = cn_msg.from_buffer_copy(buffer2, p)
p = p+len(bytes(cmsgr))
msgr = w1_netlink_msg.from_buffer_copy(buffer2, p)
p = p+len(bytes(msgr))
w1_cmdr = w1_netlink_cmd.from_buffer_copy(buffer2, p)
p = p+len(bytes(w1_cmdr))
temp = c_uint16.from_buffer_copy(buffer2, p)
print("number of bytes found ", w1_cmdr.len, flush=True)
print(temp.value/16)
VSCODE TASKS
settings.json
which has to be customized to your IP, user name and folder:
{
"sshUser": "mike",
"sshEndpoint": "192.168.253.20",
"remoteDirectory": "/home/mike/Documents/${workspaceFolderBasename}",
}
tasks.json
{
"version": "2.0.0",
"tasks": [
{
"label": "copyToRemote",
"type": "shell",
"problemMatcher": [],
"presentation": {
"showReuseMessage": false,
"clear": true
}
},
{
"label": "makeRemoteWorkSpace",
"type": "shell",
"problemMatcher": [],
"presentation": {
"showReuseMessage": false,
}
},
{
"label": "RunR",
"type": "shell",
"problemMatcher": [],
"presentation": {
"showReuseMessage": false,
}
},
{
"label": "RunRemote",
"dependsOrder": "sequence",
"dependsOn": [
"copyToRemote",
"RunR"
],
"problemMatcher": [],
"group": {
"kind": "build",
"isDefault": true
},
},
{
"label": "StopREmotePython",
"type": "shell",
"problemMatcher": [],
"presentation": {
"showReuseMessage": true,
}
},
{
"label": "wait",
"type": "shell",
"command": "timeout 10"
},
{
"label": "tunnel",
"type": "shell",
"problemMatcher": [],
"presentation": {
"showReuseMessage": false,
}
},
{
"label": "startDebug",
"type": "shell",
"command": "ssh -2 ${config:
sshUser}@${config:sshEndpoint} 'nohup python3 -m debugpy --listen 0.0.0.0:5678 --wait-for-client ${config:remoteDirectory}/${relativeFileDirname}/${fileBasename} > /dev/null 2>&1 &'",
"problemMatcher": [],
"presentation": {
"showReuseMessage": false,
}
},
{
"label": "copyAndDebug",
"dependsOrder": "sequence",
"dependsOn": [
"copyToRemote",
"startDebug",
"wait"
],
"presentation": {
"showReuseMessage": false,
},
},
]
}
launch.json
{
"version": "0.2.0",
"configurations": [
{
"name": "Python: Remote Attach",
"type": "python",
"request": "attach",
"connect": {
"host": "localhost",
"port": 5678
},
"pathMappings": [
{
"localRoot": "${workspaceFolder}/${relativeFileDirname}/",
"remoteRoot": "${config:remoteDirectory}/${relativeFileDirname}"
}
],
"preLaunchTask": "copyAndDebug",
"postDebugTask": "StopREmotePython"
}
]
}