IOT Raspberry Pi Programs

pyIoT360cover

We have decided not to make the programs available as a download because this is not the point of the book - the programs are not finished production code but something you should type in and study.

The best solution is to provide the source code of the programs in a form that can be copied and pasted into a NetBeans of VS Code project. 

 

Note: The VS Code task listings are at the very end of this page.

The only downside is that you have to create a project to paste the code into.

To do this follow the instructionin the book- in particular remember to add the bcm2835 library and also any library references that may be needed. 

All of the programs below were copy and pasted from working programs in the IDE. They have been formatted using the built in formatter and hence are not identical in layout to the programs in the book. This is to make copy and pasting them easier. The programs in the book are formatted to be easy to read on the page.

If anything you consider important is missing or if you have any requests or comments  contact:

This email address is being protected from spambots. You need JavaScript enabled to view it. 

Page 29

from gpiozero import LED
from time import sleep

led = LED(4)

while True:
    led.on()
    sleep(1)
    led.off()
    sleep(1)
 

Page 54

from gpiozero import LED
from signal import pause

led = LED(4)
led.blink(on_time=1,off_time=1,n=100)
print("Program Complete")
pause()

Page 55

from gpiozero import LED
from time import sleep

led = LED(4)

while True:
    led.toggle()
    sleep(1)
    led.toggle()
    sleep(1)
 

Page 57

from gpiozero import Buzzer
from signal import pause

buzz = Buzzer(4)
buzz.beep(on_time=1,off_time=1,n=100)
print("Program Complete")
pause()
 

Page 58

from gpiozero import DigitalOutputDevice
class Lock(DigitalOutputDevice):
  def __init__(self,*args,**kwargs):
      if 'active_high' in kwargs:
          raise TypeError("active_high not supported")
      super().__init__(*args,**kwargs)            
  def lock(self):
      super().on()
  def unlock(self):
      super().off()
  def on(self):
      raise AttributeError("'Lock' object has no attribute 'on'")
  def off(self):
      raise AttributeError("'Lock' object has no attribute 'off'")
  def blink(self):
      raise AttributeError("'Lock' object has no attribute 'blink'")
 

Page 59

from gpiozero import LED
led1 = LED(4)
led2 = LED(17)
while True:
    led1.on()
    led2.on()
    led1.off()
    led2.off()
  

Page 65

from gpiozero import Device
from time import sleep
Device()
pin=Device.pin_factory.pin(4)
pin._set_function('output')
while True:
    pin.state=1
    sleep(1)
    pin.state=0
    sleep(1) 

Page 92

from gpiozero import Button
from signal import pause
button = Button(4, bounce_time=0.25)


def pressed():
    print(“Button pressed”)


def released():
    print(“Button released”)


button.when_pressed = pressed
button.when_released = released
pause()

Page 95

from gpiozero import Button
from gpiozero import LED
from time import sleep

button = Button(4)
led1 = LED(17, initial_value=True)
led2 = LED(27)
led3 = LED(22)

state = 0
buttonState = button.value
while True:
    buttonNow = button.value
    buttonDelta = buttonNow-buttonState
    buttonState = buttonNow
    if state == 0:
        if buttonDelta == 1:
            state = 1
            led1.off()
            led2.on()
            led3.off()

        continue
    if state == 1:
        if buttonDelta == 1:
            state = 2
            led1.off()
            led2.on()
            led3.off()
        continue
    if state == 2:
        if buttonDelta == 1:
            state = 0
            led1.on()
            led2.off()
            led3.off()
        continue
    sleep(0.1)

 

Page 96

 

from gpiozero import Button
from time import perf_counter_ns

button = Button(4)
while True:
    button.wait_for_press()
button.wait_for_release()
t1 = perf_counter_ns()
button.wait_for_press()
t2 = perf_counter_ns()
print((t2-t1)/1000000)

 

 

Page 97

 
from gpiozero import Button
from time import perf_counter_ns
button = Button(4)
while True:
    while not button.value:
        pass
    while button.value:
        pass
    t1 = perf_counter_ns()
    while not button.value:
        pass
    t2 = perf_counter_ns()
    print((t2-t1)/1000000)
 

Page 97

from gpiozero import DigitalInputDevice
from time import perf_counter_ns
pulse = DigitalInputDevice(4)
while True:
    while not pulse.value:
        pass
    while pulse.value:
        pass
    t1 = perf_counter_ns()
    while not pulse.value:
        pass
    t2 = perf_counter_ns()
    print((t2-t1)/1000000)
 

Page 98

 
from gpiozero import Device
from time import perf_counter_ns

Device()
pulse = Device.pin_factory.pin(4)
pulse.input_with_pull("up")
while True:
    while not pulse.state:
        pass
    while pulse.state:
        pass
    t1 = perf_counter_ns()
    while not pulse.state:
        pass
    t2 = perf_counter_ns()
    print((t2-t1)/1000000)
 

Page 98/99

from gpiozero import DigitalInputDevice


class Door(DigitalInputDevice):
    def __init__(self, pin=None, pull_up=True, active_state=None,
                 bounce_time=None, pin_factory=None):
        super(Door, self).__init__(
            pin, pull_up=pull_up, active_state=active_state,
            bounce_time=bounce_time, pin_factory=pin_factory)

    @property
    def value(self):
        return super(Door, self).value


Door.is_closed = Door.is_active
Door.when_open = Door.when_deactivated
Door.when_closed = Door.when_activated
Door.wait_for_open = Door.wait_for_inactive
Door.wait_for_close = Door.wait_for_active
 

Page 110

 
from gpiozero import LightSensor


class Moisture(LightSensor):
    def __init__(self, pin=None, queue_len=5, charge_time_limit=0.01, threshold=0.1, partial=False, pin_factory=None):
        super(Moisture, self).__init__(pin, threshold=threshold, queue_len=queue_len,
                                       charge_time_limit=charge_time_limit, pin_factory=pin_factory)


Moisture.wait_for_dry = Moisture.wait_for_light
Moisture.wait_for_wet = Moisture.wait_for_dark
Moisture.when_dry = Moisture.when_light
Moisture.when_wet = Moisture.when_dark

Page 114

from gpiozero import PWMLED, DistanceSensor
from gpiozero.tools import inverted
from signal import pause
led = PWMLED(4)
dist = DistanceSensor(5, 6)
led.source = inverted(dist)
pause

Page 115

 
from gpiozero import PWMLED, DistanceSensor
from gpiozero.tools import inverted, averaged
from signal import pause

led = PWMLED(4)
front = DistanceSensor(5, 6)
back = DistanceSensor(7, 8)
led.source = averaged(front, back)
pause

Page 116

from signal import pause
from gpiozero import PWMLED, DistanceSensor
from gpiozero.tools import _normalize


def mined(*values):
    values = [_normalize(v) for v in values]
    for v in zip(*values):
        yield min(v)


led = PWMLED(4)
front = DistanceSensor(5, 6)
back = DistanceSensor(7, 8)
led.source = mined(front, back)
pause

Page 117

def PWM_values(period=360, duty=0.5):
    _ontime = int(period*duty)
    _offtime = period-_ontime
    data = [1]*_ontime+[0]*_offtime
    while True:
        for i in range(_ontime):
            print(1)
            yield 1
        for i in range(_offtime):
            print(0)
            yield 0

Page 119

import io
from gpiozero import InternalDevice


class MemoryFree(InternalDevice):
    def __init__(self, threshold=0.9, memfile='/proc/meminfo',
                 pin_factory=None):
        super(MemoryFree, self).__init__(pin_factory=pin_factory)
        self._fire_events(self.pin_factory.ticks(), None)
        self.memfile = memfile
        self.threshold = threshold
        self._totalmem = 0
        self._availablemem = 0

    @property
    def free_mem(self):
        with io.open(self.memfile, 'r') as f:
            self._totalmem = int(f.readline().strip().split()[1])
            freemem = int(f.readline().strip().split()[1])
            self._availablemem =
            int(f.readline().strip().split()[1])
            return freemem

    @property
    def value(self):
        return self.free_mem/self._availablemem

    @property
    def is_active(self):
        return self.value < 1-self.threshold

Page 126

from gpiozero import PWMOutputDevice
from signal import pause
pwm = PWMOutputDevice(4)
pwm.frequency = 10000
pwm.value = 0.5
pause()

Page 129

from gpiozero import PWMOutputDevice
pwm = PWMOutputDevice(4)
pwm.frequency = 1000
while True:
    pwm.value = 0.1
    pwm.value = 0.9

Page 129

from gpiozero import PWMOutputDevice
from time import sleep
pwm = PWMOutputDevice(4)
pwm.frequency = 1000
pwm.value = 0.1
while True:
    sleep(0.5)
    pwm.toggle()

Page 131

from gpiozero import PWMOutputDevice
from time import sleep

pwm = PWMOutputDevice(4)
pwm.frequency = 1000
steps = 8
delay = 0.01
while True:
    for d in range(steps):
        pwm.value = d/steps
    sleep(delay)

for d in range(steps):
    pwm.value = (steps-d)/steps
    sleep(delay)

Page 138

from gpiozero import PWMOutputDevice
from time import sleep

pwm = PWMOutputDevice(4)
pwm.frequency = 1000
steps = 8
delay = 0.001
while True:
    for d in range(steps):
        pwm.value = d/steps
        sleep(delay)

    for d in range(steps):
        pwm.value = (steps-d)/steps
        sleep(delay)

Page 145

class uniMotor(PWMOutputDevice):
    def __init__(self, pin=None, active_high=True, initial_value=0, pin_factory=None):
        super(uniMotor, self).__init__(pin, active_high,
                                       initial_value, pin_factory=pin_factory)

    def speed(self, value):
        self._write(value)


motor = uniMotor(4)
motor.speed(0.5)
sleep(10)

Page 154

from gpiozero import Servo
from time import sleep

servo = Servo(4)

while True:
    servo.min()
    sleep(0.5)
    servo.mid()
    sleep(0.5)
    servo.max()
    sleep(0.5)

Page 155

from gpiozero import Servo


class ServoInvert(Servo):


@Servo.value.setter
  def value(self, value):
       if value is None:
            self.pwm_device.pin.frequency = None
        elif -1 <= value <= 1:
            self.pwm_device.pin.frequency = int(1 / self.frame_width)
            self.pwm_device.value = (self._min_dc + self._dc_range *((value - self._min_value) / self._value_range))
        else:
            raise OutputDeviceBadValue(
                "Servo value must be between -1 and 1, or None")

Page 164

from gpiozero import DigitalOutputDevice
from time import sleep


class StepperBi4():
    def __init__(self, A=None, Am=None, B=None, Bm=None):
        self.phase = 0
        self.gpios = tuple(DigitalOutputDevice(pin) for pin in (A, B, Am, Bm))
        self.halfstepSeq = [
            [1, 0, 0, 0],
            [1, 1, 0, 0],
            [0, 1, 0, 0],
            [0, 1, 1, 0],
            [0, 0, 1, 0],
            [0, 0, 1, 1],
            [0, 0, 0, 1],
            [1, 0, 0, 1]
        ]

    def setPhase(self, phase):
        self.phase = phase
        for gpio, state in zip(self.gpios, self.halfstepSeq[phase]):
            gpio.value = state

    def stepForward(self):
        self.phase = (self.phase+1) % 8
        self.setPhase(self.phase)

    def stepReverse(self):
        self.phase = (self.phase-1) % 8
        self.setPhase(self.phase)


step = StepperBi4(4, 17, 27, 22)
step.setPhase(0)
while True:
    sleep(0.001)
    step.stepForward()

Page 170

from gpiozero import LED
from time import sleep


class TriLED():
    def __init__(self, pin1=None, pin2=None, pin3=None):
        self.LEDs = (LED(pin1), LED(pin2), LED(pin3))

    def AllOn(self):
        self.LEDs[0].on()
        self.LEDs[1].on()
        self.LEDs[2].on()

    def AllOff(self):
        self.LEDs[0].off()
        self.LEDs[1].off()
        self.LEDs[2].off()

Page 171

 class TriLED(CompositeDevice):
       def __init__(self, pin1=None, pin2=None, pin3=None):
            super(TriLED, self).__init__(LED(pin1), LED(pin2), LED(pin3))

        def AllOn(self):
            self[0].on()
            self[1].on()
            self[2].on()

        def AllOff(self):
            self[0].off()
            self[1].off()
            self[2].off()

Page 183

from gpiozero import DigitalOutputDevice, CompositeDevice


class StepperBi4(CompositeDevice):
    def __init__(self, A=None, Am=None, B=None, Bm=None):
        if not all(p is not None for p in [A, Am, B, Bm]):
            raise GPIOPinMissing(
                'Four GPIO pins must be provided'
            )
        super(StepperUni4, self).__init__(DigitalOutputDevice(A), DigitalOutputDevice(
            B), DigitalOutputDevice(Am), DigitalOutputDevice(Bm))
        self.phase = 0
        self.halfstepSeq = [
            [1, 0, 0, 0],
            [1, 1, 0, 0],
            [0, 1, 0, 0],
            [0, 1, 1, 0],
            [0, 0, 1, 0],
            [0, 0, 1, 1],
            [0, 0, 0, 1],
            [1, 0, 0, 1]
        ]

    def setPhase(self, phase):
        self.phase = phase
        for gpio, state in zip(self, self.halfstepSeq[phase]):
            gpio.pin._set_state(state)

    def stepForward(self):
        self.phase = (self.phase+1) % 8
        self.setPhase(self.phase)

    def stepReverse(self):
        self.phase = (self.phase-1) % 8
        self.setPhase(self.phase)

Page 186

from gpiozero import DigitalOutputDevice, CompositeDevice
from time import sleep
from gpiozero.threads import GPIOThread


class StepperBi4(CompositeDevice):
    def __init__(self, A=None, Am=None, B=None, Bm=None):
        if not all(p is not None for p in [A, Am, B, Bm]):
            raise GPIOPinMissing('Four GPIO pins must be provided')
        super(StepperUni4, self).__init__(DigitalOutputDevice(A), DigitalOutputDevice(
            B), DigitalOutputDevice(Am), DigitalOutputDevice(Bm))
        self.phase = 0
        self._rotate_thread = None
        self.halfstepSeq = [
            [1, 0, 0, 0],
            [1, 1, 0, 0],
            [0, 1, 0, 0],
            [0, 1, 1, 0],
            [0, 0, 1, 0],
            [0, 0, 1, 1],
            [0, 0, 0, 1],
            [1, 0, 0, 1]
        ]

    def setPhase(self, phase):
        self.phase = phase
        for gpio, state in zip(self, self.halfstepSeq[phase]):
            gpio.pin._set_state(state)

    def stepForward(self):
        self.phase = (self.phase+1) % 8
        self.setPhase(self.phase)

    def stepReverse(self):
        self.phase = (self.phase-1) % 8
        self.setPhase(self.phase)

    def forward(self, speed=0):
        self._stop_rotate()
        if speed == 0:
            return
        self._rotate_thread = GPIOThread(
            target=self._rotate, args=(+1, speed))
        self._rotate_thread.start()

    def reverse(self, speed=1):
        self._stop_rotate()
        if speed == 0:
            return
        self._rotate_thread = GPIOThread(
            target=self._rotate,   args=(-1, speed))
        self._rotate_thread.start()

    def _rotate(self, dir, speed):
        delay = 60/(speed*400)-0.001
        while True:
            if dir == 1:
                self.stepForward()
            if dir == -1:
                self.stepReverse()
            if self._rotate_thread.stopping.wait(delay):
                break

    def _stop_rotate(self):
        if getattr(self, '_rotate_thread', None):
            self._rotate_thread.stop()
        self._rotate_thread = None
 
Page 186
 
import subprocess
temp = subprocess.Popen(["sudo", "dtparam", "-l"], stdout=subprocess.PIPE)
output = str(temp.communicate())
print(output)
lastSPI = output.rfind("spi")
if lastSPI != -1:
    lastSPI = output.find("spi=on", lastSPI)
    if lastSPI == -1:
        temp = subprocess.Popen(
            ["sudo", "dtparam", "spi=on"], stdout=subprocess.PIPE)
        output = str(temp.communicate())
        print("adding", output)
else:
    temp = subprocess.Popen(
        ["sudo", "dtparam", "spi=on"], stdout=subprocess.PIPE)
    output = str(temp.communicate())
    print("adding", output)
 
 Page 190
 
import subprocess
temp = subprocess.Popen(["sudo", "dtparam", "-l"], stdout=subprocess.PIPE)
output = str(temp.communicate())
print(output)
lastSPI = output.rfind("spi")
if lastSPI != -1:
    lastSPI = output.find("spi=on", lastSPI)
    if lastSPI == -1:
        temp = subprocess.Popen(
            ["sudo", "dtparam", "spi=on"], stdout=subprocess.PIPE)
        output = str(temp.communicate())
        print("adding", output)
else:
    temp = subprocess.Popen(
        ["sudo", "dtparam", "spi=on"], stdout=subprocess.PIPE)
    output = str(temp.communicate())
    print("adding", output)
 Page 207
from gpiozero import SPIDevice
Dev = SPIDevice()
Dev.clock_mode = 0
Dev.select_high = False
Dev._spi._interface.max_speed_hz = 60000

words = Dev._spi.transfer([0x01, 0x80, 0x00])
data = (words[1] & 0x03) << 8 | words[2]
volts = data * 3.3 / 1023.0
print(volts)
Dev.close()
 Page 212
class DS3234rtc(SPIDevice):
    def __init__(self):
        super(DS3234rtc, self).__init__()
        self.clock_mode = 1
        self.select_high = False
        self._spi._interface.max_speed_hz = 5000000

    def setDateTime(self, dateTime):
        datetimetuple = dateTime.timetuple()
        datetimetuple = (datetimetuple[0]-2000,)+datetimetuple[1:3] + \
            (dateTime.isoweekday(),)+datetimetuple[3:6]
        datetimetuple = datetimetuple[::-1]
        for i in range(0, 7):
            data = datetimetuple[i]
            data = (data//10) << 4 | (data % 10)
            words = self._spi.transfer([0x80+i, data])

    def getDateTime(self):
        datetimelist = []
        for i in range(7):
            words = self._spi.transfer([i, 0x00])
            if i == 3:
                continue
            byte = words[1]
            data = (byte & 0x0F)+(byte >> 4)*10
            datetimelist.insert(0, data)
        datetimelist[0] += 2000
        return datetime(*datetimelist)
 Page 227 Appendix

settings.json

{
    "sshUser": "pi",
    "sshEndpoint": "192.168.11.170",
    "remoteDirectory": "/home/pi/Documents/${workspaceFolderBasename}", 
 
}

tasks.json

{
    "version": "2.0.0",
    "tasks": [
        {
            "label": "copyToRemote",
            "type": "shell",
            "command": "scp -r ${fileDirname} ${config:sshUser}@${config:sshEndpoint}:${config:remoteDirectory}/",
            "problemMatcher": [],
            "presentation": {
                "showReuseMessage": false,
                "clear": true
            }
        },
        {
            "label": "makeRemoteWorkSpace",
            "type": "shell",
            "command": "ssh ${config:sshUser}@${config:sshEndpoint} ;'mkdir  ${config:remoteDirectory}'",
            "problemMatcher": [],
            "presentation": {
                "showReuseMessage": false,
            }
        },
        {
            "label": "RunR",
            "type": "shell",
            "command": "ssh ${config:sshUser}@${config:sshEndpoint} ;'python3 ${config:remoteDirectory}/${relativeFileDirname}/${fileBasename}'",
            "problemMatcher": [],
            "presentation": {
                "showReuseMessage": false,
            }
        },
        {
            "label": "RunRemote",
            "dependsOrder": "sequence",
            "dependsOn": [
                "copyToRemote",
                "RunR"
            ],
            "problemMatcher": [],
            "group": {
                "kind": "build",
                "isDefault": true
            },
        },
        
        {
            "label": "StopRemotePython",
            "type": "shell",
            "command": "ssh ${config:sshUser}@${config:sshEndpoint} ;'pkill python3'",
            "problemMatcher": [],
            "presentation": {
                "showReuseMessage": true,
            }
        },
        {
            "label": "wait",
            "type": "shell",
            "command": "timeout 10"
        },
        {
            "label": "tunnel",
            "type": "shell",
            "command": "ssh -2 -L 5678:localhost:5678 ${config:sshUser}@${config:sshEndpoint}",
            "problemMatcher": [],
            "presentation": {                
                "showReuseMessage": false,             
            }
        },
        {
            "label": "startDebug",
            "type": "shell",
            "command": "ssh -2 ${config:sshUser}@${config:sshEndpoint} ; 'nohup python3 -m debugpy --listen 0.0.0.0:5678 --wait-for-client ${config:remoteDirectory}/${relativeFileDirname}/${fileBasename} > /dev/null 2>&1 &'",
            "problemMatcher": [],
            "presentation": {
                "showReuseMessage": false,
            }
        },
        {
            "label": "copyAndDebug",
            "dependsOrder": "sequence",
            "dependsOn": [
                "copyToRemote", 
                "startDebug",
                "wait"
            ],
            "presentation": {               
                "showReuseMessage": false,                
            },
        },
    ]
}